This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b8cb51f6a [INLONG-5251][SDK] Add proxy client config and proxylist 
manager for DataProxy C++ SDK (#5252)
b8cb51f6a is described below

commit b8cb51f6a2a51c8247c015c19924e7e9441b5fb3
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 22:51:20 2022 +0800

    [INLONG-5251][SDK] Add proxy client config and proxylist manager for 
DataProxy C++ SDK (#5252)
---
 .../dataproxy-sdk-cpp/src/base/client_config.cc    | 701 +++++++++++++++
 .../dataproxy-sdk-cpp/src/base/client_config.h     | 113 +++
 .../dataproxy-sdk-cpp/src/base/proxylist_config.cc | 990 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/base/proxylist_config.h  | 201 +++++
 4 files changed, 2005 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
new file mode 100644
index 000000000..cbebd1372
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.cc
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "client_config.h"
+
+#include <rapidjson/document.h>
+
+#include "sdk_constant.h"
+#include "logger.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+    bool ClientConfig::parseConfig()
+    {
+        std::string file_content;
+        if (!Utils::readFile(config_path_, file_content))
+        {
+            return false;
+        }
+
+        rapidjson::Document root;
+
+        if (root.Parse(file_content.c_str()).HasParseError())
+        {
+            LOG_ERROR("failed to parse user config file: %s", 
config_path_.c_str());
+            return false;
+        }
+
+        if (!root.HasMember("init-param"))
+        {
+            LOG_ERROR("param is not an object");
+            return false;
+        }
+        const rapidjson::Value &doc = root["init-param"];
+
+        // thread_num
+        if (doc.HasMember("thread_num") && doc["thread_num"].IsInt() && 
doc["thread_num"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["thread_num"];
+
+            thread_nums_ = obj.GetInt();
+            LOG_WARN("thread_num in user config  is: %d", thread_nums_);
+        }
+        else
+        {
+            thread_nums_ = constants::kThreadNums;
+            LOG_WARN("thread_num in user config is not expect, then use 
default: %d", thread_nums_);
+        }
+        // shared_buf_nums
+        if (doc.HasMember("shared_buf_num") && doc["shared_buf_num"].IsInt() 
&& doc["shared_buf_num"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["shared_buf_num"];
+
+            shared_buf_nums_ = obj.GetInt();
+            LOG_WARN("shared_buf_num in user config  is: %d", 
shared_buf_nums_);
+        }
+        else
+        {
+            shared_buf_nums_ = constants::kSharedBufferNums;
+            LOG_WARN("shared_buf_num in user config is not expect, then use 
default: %d", shared_buf_nums_);
+        }
+
+        // inlong_group_ids, split by comma
+        if (doc.HasMember("inlong_group_ids") && 
doc["inlong_group_ids"].IsString())
+        {
+            const rapidjson::Value &obj = doc["inlong_group_ids"];
+            std::string groupIds_str = obj.GetString();
+            int32_t size = Utils::splitOperate(groupIds_str, 
inlong_group_ids_, ",");
+            LOG_WARN("inlong_group_ids in user config is: <%s>", 
Utils::getVectorStr(inlong_group_ids_).c_str());
+        }
+        else
+        {
+            LOG_WARN("inlong_group_ids in user config is empty");
+        }
+
+        // enable_groupId_isolation
+        if (doc.HasMember("enable_groupId_isolation") && 
doc["enable_groupId_isolation"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_groupId_isolation"];
+
+            enable_groupId_isolation_ = obj.GetBool();
+            LOG_WARN("enable_groupId_isolation in user config  is: %s", 
enable_groupId_isolation_ ? "true" : "false");
+        }
+        else
+        {
+            enable_groupId_isolation_ = constants::kEnableBidIsolation;
+            LOG_WARN("enable_groupId_isolation in user config is not expect, 
then use default: %s", enable_groupId_isolation_ ? "true" : "false");
+        }
+
+        // if enable_groupId_isolation_ is true, groupIds cann't be empty
+        if (enable_groupId_isolation_ && inlong_group_ids_.empty())
+        {
+            LOG_ERROR("inlong_group_ids is empty, check config!");
+            return false;
+        }
+
+        // auth settings
+        if (doc.HasMember("need_auth") && doc["need_auth"].IsBool() && 
doc["need_auth"].GetBool()){
+            if (!doc.HasMember("auth_id") || !doc.HasMember("auth_key") || 
!doc["auth_id"].IsString() || !doc["auth_key"].IsString() 
||doc["auth_id"].IsNull() || doc["auth_key"].IsNull())
+            {
+                LOG_ERROR("need_auth, but auth_id or auth_key is empty or not 
string type, check config!");
+                return false;
+            }
+            need_auth_ = true;
+            auth_id_ = doc["auth_id"].GetString();
+            auth_key_ = doc["auth_key"].GetString();
+            LOG_WARN("need_auth, auth_id:%s, auth_key:%s", auth_id_.c_str(), 
auth_key_.c_str());        
+        }
+        else
+        {
+            need_auth_ = constants::kNeedAuth;
+            LOG_WARN("need_auth is not expect, then use default:%s", 
need_auth_ ? "true" : "false");
+        }
+
+        // buffer_num_per_groupId
+        if (doc.HasMember("buffer_num_per_groupId") && 
doc["buffer_num_per_groupId"].IsInt() && doc["buffer_num_per_groupId"].GetInt() 
> 0)
+        {
+            const rapidjson::Value &obj = doc["buffer_num_per_groupId"];
+
+            buffer_num_per_groupId_ = obj.GetInt();
+            LOG_WARN("buffer_num_per_groupId in user config  is: %d", 
buffer_num_per_groupId_);
+        }
+        else
+        {
+            buffer_num_per_groupId_ = constants::kBufferNumPerBid;
+            LOG_WARN("buffer_num_per_groupId in user config is not expect, 
then use default: %d", buffer_num_per_groupId_);
+        }
+
+        // ser_ip
+        if (doc.HasMember("ser_ip") && doc["ser_ip"].IsString())
+        {
+            const rapidjson::Value &obj = doc["ser_ip"];
+
+            ser_ip_ = obj.GetString();
+            LOG_WARN("ser_ip in user config  is: %s", ser_ip_.c_str());
+        }
+        else
+        {
+            ser_ip_ = constants::kSerIP;
+            LOG_WARN("ser_ip in user config is not expect, then use default: 
%s", ser_ip_.c_str());
+        }
+        // enable_pack
+        if (doc.HasMember("enable_pack") && doc["enable_pack"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_pack"];
+
+            enable_pack_ = obj.GetBool();
+            LOG_WARN("enable_pack in user config  is: %s", enable_pack_ ? 
"true" : "false");
+        }
+        else
+        {
+            enable_pack_ = constants::kEnablePack;
+            LOG_WARN("enable_pack in user config is not expect, then use 
default: %s", enable_pack_ ? "true" : "false");
+        }
+        // pack_size
+        if (doc.HasMember("pack_size") && doc["pack_size"].IsInt() && 
doc["pack_size"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["pack_size"];
+
+            pack_size_ = obj.GetInt();
+            LOG_WARN("pack_size in user config  is: %d", pack_size_);
+        }
+        else
+        {
+            pack_size_ = constants::kPackSize;
+            LOG_WARN("pack_size in user config is not expect, then use 
default: %d ms", pack_size_);
+        }
+        // pack_timeout
+        if (doc.HasMember("pack_timeout") && doc["pack_timeout"].IsInt() && 
doc["pack_timeout"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["pack_timeout"];
+
+            pack_timeout_ = obj.GetInt();
+            LOG_WARN("pack_timeout in user config  is: %d ms", pack_timeout_);
+        }
+        else
+        {
+            pack_timeout_ = constants::kPackTimeout;
+            LOG_WARN("pack_timeout in user config is not expect, then use 
default: %d", pack_timeout_);
+        }
+        // ext_pack_size
+        if (doc.HasMember("ext_pack_size") && doc["ext_pack_size"].IsInt() && 
doc["ext_pack_size"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["ext_pack_size"];
+
+            ext_pack_size_ = obj.GetInt();
+            LOG_WARN("ext_pack_size in user config  is: %d", ext_pack_size_);
+        }
+        else
+        {
+            ext_pack_size_ = constants::kExtPackSize;
+            LOG_WARN("ext_pack_size in user config is not expect, then use 
default: %d", ext_pack_size_);
+        }
+        // enable_zip
+        if (doc.HasMember("enable_zip") && doc["enable_zip"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_zip"];
+
+            enable_zip_ = obj.GetBool();
+            LOG_WARN("enable_zip in user config  is: %s", enable_zip_ ? "true" 
: "false");
+        }
+        else
+        {
+            enable_zip_ = constants::kEnablePack;
+            LOG_WARN("enable_zip in user config is not expect, then use 
default: %s", enable_zip_ ? "true" : "false");
+        }
+        // min_zip_len
+        if (doc.HasMember("min_zip_len") && doc["min_zip_len"].IsInt() && 
doc["min_zip_len"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["min_zip_len"];
+
+            min_zip_len_ = obj.GetInt();
+            LOG_WARN("min_zip_len in user config  is: %d", min_zip_len_);
+        }
+        else
+        {
+            min_zip_len_ = constants::kMinZipLen;
+            LOG_WARN("min_zip_len in user config is not expect, then use 
default: %d", min_zip_len_);
+        }
+        // enable_retry
+        if (doc.HasMember("enable_retry") && doc["enable_retry"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_retry"];
+
+            enable_retry_ = obj.GetBool();
+            LOG_WARN("enable_retry in user config  is: %s", enable_retry_ ? 
"true" : "false");
+        }
+        else
+        {
+            enable_retry_ = constants::kEnableRetry;
+            LOG_WARN("enable_retry in user config is not expect, then use 
default: %s", enable_retry_ ? "true" : "false");
+        }
+        // retry_interval
+        if (doc.HasMember("retry_ms") && doc["retry_ms"].IsInt() && 
doc["retry_ms"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["retry_ms"];
+
+            retry_interval_ = obj.GetInt();
+            LOG_WARN("retry_interval in user config  is: %d ms", 
retry_interval_);
+        }
+        else
+        {
+            retry_interval_ = constants::kRetryInterval;
+            LOG_WARN("retry_interval in user config is not expect, then use 
default: %d ms", retry_interval_);
+        }
+        // retry_num
+        if (doc.HasMember("retry_num") && doc["retry_num"].IsInt() && 
doc["retry_num"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["retry_num"];
+
+            retry_num_ = obj.GetInt();
+            LOG_WARN("retry_num in user config  is: %d times", retry_num_);
+        }
+        else
+        {
+            retry_num_ = constants::kRetryNum;
+            LOG_WARN("retry_num in user config is not expect, then use 
default: %d times", retry_num_);
+        }   
+        // log_num
+        if (doc.HasMember("log_num") && doc["log_num"].IsInt() && 
doc["log_num"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["log_num"];
+
+            log_num_ = obj.GetInt();
+            LOG_WARN("log_num in user config  is: %d", log_num_);
+        }
+        else
+        {
+            log_num_ = constants::kLogNum;
+            LOG_WARN("log_num in user config is not expect, then use default: 
%d", log_num_);
+        }
+        // log_size
+        if (doc.HasMember("log_size") && doc["log_size"].IsInt() && 
doc["log_size"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["log_size"];
+
+            log_size_ = obj.GetInt();
+            LOG_WARN("log_size in user config  is: %dM", log_size_);
+        }
+        else
+        {
+            log_size_ = constants::kLogSize;
+            LOG_WARN("log_size in user config is not expect, then use default: 
%dM", log_size_);
+        }
+        // log_level
+        if (doc.HasMember("log_level") && doc["log_level"].IsInt() && 
doc["log_level"].GetInt() >= 0 && doc["log_level"].GetInt() <= 4)
+        {
+            const rapidjson::Value &obj = doc["log_level"];
+
+            log_level_ = obj.GetInt();
+            LOG_WARN("log_level in user config  is: %d", log_level_);
+        }
+        else
+        {
+            log_level_ = constants::kLogLevel;
+            LOG_WARN("log_level in user config is not expect, then use 
default: %d", log_level_);
+        }
+        // log_file_type
+        if (doc.HasMember("log_file_type") && doc["log_file_type"].IsInt() && 
doc["log_file_type"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["log_file_type"];
+
+            log_file_type_ = obj.GetInt();
+            LOG_WARN("log_file_type in user config  is: %d", log_file_type_);
+        }
+        else
+        {
+            log_file_type_ = constants::kLogFileType;
+            LOG_WARN("log_file_type in user config is not expect, then use 
default: %d", log_file_type_);
+        }
+        // log_path
+        if (doc.HasMember("log_path") && doc["log_path"].IsString())
+        {
+            const rapidjson::Value &obj = doc["log_path"];
+
+            log_path_ = obj.GetString();
+            LOG_WARN("log_path in user config  is: %s", log_path_.c_str());
+        }
+        else
+        {
+            log_path_ = constants::kLogPath;
+            LOG_WARN("log_path in user config is not expect, then use default: 
%s", log_path_.c_str());
+        }
+        // log_enable_limit
+        if (doc.HasMember("log_enable_limit") && 
doc["log_enable_limit"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["log_enable_limit"];
+
+            log_enable_limit_ = obj.GetBool();
+            LOG_WARN("log_enable_limit in user config  is: %s", 
log_enable_limit_ ? "true" : "false");
+        }
+        else
+        {
+            log_enable_limit_ = constants::kLogEnableLimit;
+            LOG_WARN("log_enable_limit in user config is not expect, then use 
default: %s", log_enable_limit_ ? "true" : "false");
+        }
+        // proxy_URL
+        if (doc.HasMember("proxy_cfg_preurl") && 
doc["proxy_cfg_preurl"].IsString())
+        {
+            const rapidjson::Value &obj = doc["proxy_cfg_preurl"];
+
+            proxy_URL_ = obj.GetString();
+            LOG_WARN("proxy_cfg_preurl in user config  is: %s", 
proxy_URL_.c_str());
+        }
+        else if (doc.HasMember("bus_cfg_preurl") && 
doc["bus_cfg_preurl"].IsString()) // compatible with internal usage
+        {
+            const rapidjson::Value &obj = doc["bus_cfg_preurl"];
+
+            proxy_URL_ = obj.GetString();
+            LOG_WARN("proxy_cfg_preurl in user config  is: %s", 
proxy_URL_.c_str());
+        }
+        
+        else
+        {
+            proxy_URL_ = constants::kProxyURL;
+            LOG_WARN("proxy_cfg_url in user config is not expect, then use 
default: %s", proxy_URL_.c_str());
+        }
+        // proxy_cluster_URL_, only internal usage
+        if (doc.HasMember("bus_cfg_url") && doc["bus_cfg_url"].IsString())
+        {
+            const rapidjson::Value &obj = doc["bus_cfg_url"];
+
+            proxy_cluster_URL_ = obj.GetString();
+            LOG_WARN("proxy_cluster_URL(proxy_cfg_url) in user config  is: 
%s", proxy_cluster_URL_.c_str());
+        }
+        else
+        {
+            proxy_cluster_URL_ = constants::kBusClusterURL;
+            LOG_WARN("proxy_cluster_URL(proxy_cfg_url) in user config is not 
expect, then use default: %s", proxy_cluster_URL_.c_str());
+        }
+        // enable_proxy_URL_from_cluster
+        if (doc.HasMember("enable_proxy_cfg_url") && 
doc["enable_proxy_cfg_url"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_proxy_cfg_url"];
+
+            enable_proxy_URL_from_cluster_ = obj.GetBool();
+            LOG_WARN("enable_proxy_URL_from_cluster in user config  is: %s", 
enable_proxy_URL_from_cluster_ ? "true" : "false");
+        }
+        else
+        {
+            enable_proxy_URL_from_cluster_ = 
constants::kEnableProxyURLFromCluster;
+            LOG_WARN("enable_proxy_URL_from_cluster in user config is not 
expect, then use default: %s", enable_proxy_URL_from_cluster_ ? "true" : 
"false");
+        }
+        // proxy_update_interval
+        if (doc.HasMember("proxy_update_interval") && 
doc["proxy_update_interval"].IsInt() && doc["proxy_update_interval"].GetInt() > 
0)
+        {
+            const rapidjson::Value &obj = doc["proxy_update_interval"];
+
+            proxy_update_interval_ = obj.GetInt();
+            LOG_WARN("proxy_update_interval in user config  is: %d minutes", 
proxy_update_interval_);
+        }
+        else if (doc.HasMember("bus_update_interval") && 
doc["bus_update_interval"].IsInt() && doc["bus_update_interval"].GetInt() > 
0)//internal usage
+        {
+            const rapidjson::Value &obj = doc["bus_update_interval"];
+
+            proxy_update_interval_ = obj.GetInt();
+            LOG_WARN("proxy_update_interval in user config  is: %d minutes", 
proxy_update_interval_);
+        }        
+        else
+        {
+            proxy_update_interval_ = constants::kProxyUpdateInterval;
+            LOG_WARN("proxy_update_interval in user config is not expect, then 
use default: %d minutes", proxy_update_interval_);
+        }
+        // proxy_URL_timeout
+        if (doc.HasMember("proxy_url_timeout") && 
doc["proxy_url_timeout"].IsInt() && doc["proxy_url_timeout"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["proxy_url_timeout"];
+
+            proxy_URL_timeout_ = obj.GetInt();
+            LOG_WARN("proxy_url_timeout in user config  is: %ds", 
proxy_URL_timeout_);
+        }
+        else if (doc.HasMember("bus_url_timeout") && 
doc["bus_url_timeout"].IsInt() && doc["bus_url_timeout"].GetInt() > 0) 
//internal usage
+        {
+            const rapidjson::Value &obj = doc["bus_url_timeout"];
+
+            proxy_URL_timeout_ = obj.GetInt();
+            LOG_WARN("proxy_url_timeout in user config  is: %ds", 
proxy_URL_timeout_);
+        }
+        else
+        {
+            proxy_URL_timeout_ = constants::kProxyURLTimeout;
+            LOG_WARN("proxy_url_timeout in user config is not expect, then use 
default: %ds", proxy_URL_timeout_);
+        }
+        // max_active_proxy_num
+        if (doc.HasMember("max_active_proxy") && 
doc["max_active_proxy"].IsInt() && doc["max_active_proxy"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["max_active_proxy"];
+
+            max_active_proxy_num_ = obj.GetInt();
+            LOG_WARN("max_active_proxy in user config  is: %d", 
max_active_proxy_num_);
+        }
+        else if (doc.HasMember("max_active_bus") && 
doc["max_active_bus"].IsInt() && doc["max_active_bus"].GetInt() > 0) //internal 
usage
+        {
+            const rapidjson::Value &obj = doc["max_active_bus"];
+
+            max_active_proxy_num_ = obj.GetInt();
+            LOG_WARN("max_active_proxy in user config  is: %d", 
max_active_proxy_num_);
+        }
+        else
+        {
+            max_active_proxy_num_ = constants::kMaxActiveProxyNum;
+            LOG_WARN("max_active_proxy in user config is not expect, then use 
default: %d", max_active_proxy_num_);
+        }
+        // max_buf_pool_
+        if (doc.HasMember("max_buf_pool") && doc["max_buf_pool"].IsUint() && 
doc["max_buf_pool"].GetUint() > 0)
+        {
+            const rapidjson::Value &obj = doc["max_buf_pool"];
+
+            max_buf_pool_ = obj.GetUint();
+            LOG_WARN("max_buf_pool in user config  is: %lu", max_buf_pool_);
+        }
+        else
+        {
+            max_buf_pool_ = constants::kMaxBufPool;
+            LOG_WARN("max_buf_pool in user config is not expect, then use 
default: %lu", max_buf_pool_);
+        }
+        // msg_type
+        if (doc.HasMember("msg_type") && doc["msg_type"].IsInt() && 
doc["msg_type"].GetInt() > 0 && doc["msg_type"].GetInt() < 9)
+        {
+            const rapidjson::Value &obj = doc["msg_type"];
+
+            msg_type_ = obj.GetInt();
+            LOG_WARN("msg_type in user config  is: %d", msg_type_);
+        }
+        else
+        {
+            msg_type_ = constants::kMsgType;
+            LOG_WARN("max_active_proxy in user config is not expect, then use 
default: %d", msg_type_);
+        }
+        // enable_TCP_nagle
+        if (doc.HasMember("enable_tcp_nagle") && 
doc["enable_tcp_nagle"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_tcp_nagle"];
+
+            enable_TCP_nagle_ = obj.GetBool();
+            LOG_WARN("enable_tcp_nagle in user config  is: %s", 
enable_TCP_nagle_ ? "true" : "false");
+        }
+        else
+        {
+            enable_TCP_nagle_ = constants::kEnableTCPNagle;
+            LOG_WARN("enable_tcp_nagle in user config is not expect, then use 
default: %s", enable_TCP_nagle_ ? "true" : "false");
+        }
+        // enable_heart_beat
+        if (doc.HasMember("enable_hb") && doc["enable_hb"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_hb"];
+
+            enable_heart_beat_ = obj.GetBool();
+            LOG_WARN("enable_hb in user config  is: %s", enable_heart_beat_ ? 
"true" : "false");
+        }
+        else
+        {
+            enable_heart_beat_ = constants::kEnableHeartBeat;
+            LOG_WARN("enable_hb in user config is not expect, then use 
default: %s", enable_heart_beat_ ? "true" : "false");
+        }
+
+        // heart_beat_interval
+        if (doc.HasMember("hb_interval") && doc["hb_interval"].IsInt() && 
doc["hb_interval"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["hb_interval"];
+
+            heart_beat_interval_ = obj.GetInt();
+            LOG_WARN("heart_beat_interval in user config  is: %ds", 
heart_beat_interval_);
+        }
+        else
+        {
+            heart_beat_interval_ = constants::kHeartBeatInterval;
+            LOG_WARN("heart_beat_interval in user config is not expect, then 
use default: %ds", heart_beat_interval_);
+        }
+        // enable_setaffinity
+        if (doc.HasMember("enable_setaffinity") && 
doc["enable_setaffinity"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_setaffinity"];
+
+            enable_setaffinity_ = obj.GetBool();
+            LOG_WARN("enable_setaffinity in user config  is: %s", 
enable_setaffinity_ ? "true" : "false");
+        }
+        else
+        {
+            enable_setaffinity_ = constants::kEnableSetAffinity;
+            LOG_WARN("enable_setaffinity in user config is not expect, then 
use default: %s", enable_setaffinity_ ? "true" : "false");
+        }
+        // mask_cpu_affinity
+        if (doc.HasMember("mask_cpuaffinity") && 
doc["mask_cpuaffinity"].IsInt() && doc["mask_cpuaffinity"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["mask_cpuaffinity"];
+
+            mask_cpu_affinity_ = obj.GetInt();
+            LOG_WARN("mask_cpuaffinity in user config  is: %d", 
mask_cpu_affinity_);
+        }
+        else
+        {
+            mask_cpu_affinity_ = constants::kMaskCPUAffinity;
+            LOG_WARN("mask_cpuaffinity in user config is not expect, then use 
default: %d", mask_cpu_affinity_);
+        }
+        // is_from_DC
+        if (doc.HasMember("enable_from_dc") && doc["enable_from_dc"].IsBool())
+        {
+            const rapidjson::Value &obj = doc["enable_from_dc"];
+
+            is_from_DC_ = obj.GetBool();
+            LOG_WARN("enable_from_dc in user config  is: %s", is_from_DC_ ? 
"true" : "false");
+        }
+        else
+        {
+            is_from_DC_ = constants::kIsFromDC;
+            LOG_WARN("enable_from_dc in user config is not expect, then use 
default: %s", is_from_DC_ ? "true" : "false");
+        }
+        // extend_field
+        if (doc.HasMember("extend_field") && doc["extend_field"].IsInt() && 
doc["extend_field"].GetInt() > 0)
+        {
+            const rapidjson::Value &obj = doc["extend_field"];
+
+            extend_field_ = obj.GetInt();
+            LOG_WARN("extend_field in user config  is: %d", extend_field_);
+        }
+        else
+        {
+            extend_field_ = constants::kExtendField;
+            LOG_WARN("extend_field in user config is not expect, then use 
default: %d", extend_field_);
+        }
+
+        // net_tag
+        if (doc.HasMember("net_tag") && doc["net_tag"].IsString())
+        {
+            const rapidjson::Value &obj = doc["net_tag"];
+
+            net_tag_ = obj.GetString();
+            LOG_WARN("net_tag in user config  is: %s", net_tag_.c_str());
+        }
+        else
+        {
+            net_tag_ = constants::kNetTag;
+            LOG_WARN("net_tag in user config is not expect, then use default: 
%s", net_tag_.c_str());
+        }
+
+        // set bufNum
+        buf_size_ = ext_pack_size_ + 400;
+        buf_num_ = max_buf_pool_ / (buf_size_);
+        LOG_WARN("sendBuf num of a pool is %d", buf_num_);
+
+        return true;
+    }
+
+    void ClientConfig::defaultInit(){
+        user_config_err_=true;
+
+        thread_nums_=constants::kThreadNums;
+        shared_buf_nums_=constants::kSharedBufferNums;
+        enable_groupId_isolation_=constants::kEnableBidIsolation;
+        buffer_num_per_groupId_=constants::kBufferNumPerBid;
+        net_tag_=constants::kNetTag;
+    
+        enable_pack_=constants::kEnablePack;
+        pack_size_=constants::kPackSize;
+        pack_timeout_=constants::kPackTimeout;
+        ext_pack_size_=constants::kExtPackSize;
+
+        enable_zip_=constants::kEnableZip;
+        min_zip_len_=constants::kMinZipLen;
+
+        enable_retry_=constants::kEnableRetry;
+        retry_interval_=constants::kRetryInterval;
+        retry_num_=constants::kRetryNum;
+
+        log_num_=constants::kLogNum;
+        log_size_=constants::kLogSize;
+        log_level_=constants::kLogLevel;
+        log_file_type_=constants::kLogFileType;
+        log_path_=constants::kLogPath;
+        log_enable_limit_=constants::kLogEnableLimit;
+
+        proxy_URL_=constants::kProxyURL;
+        enable_proxy_URL_from_cluster_=constants::kEnableProxyURLFromCluster;
+
+        proxy_cluster_URL_=constants::kBusClusterURL;
+        proxy_update_interval_=constants::kProxyUpdateInterval;
+        proxy_URL_timeout_=constants::kProxyURLTimeout;
+        max_active_proxy_num_=constants::kMaxActiveProxyNum;
+
+        ser_ip_=constants::kSerIP;
+        max_buf_pool_=constants::kMaxBufPool;
+        msg_type_=constants::kMsgType;
+        enable_TCP_nagle_=constants::kEnableTCPNagle;
+        enable_heart_beat_=constants::kEnableHeartBeat;
+        heart_beat_interval_=constants::kHeartBeatInterval;
+        enable_setaffinity_=constants::kEnableSetAffinity;
+        mask_cpu_affinity_=constants::kMaskCPUAffinity;
+        is_from_DC_=constants::kIsFromDC;
+        extend_field_=constants::kExtendField;
+
+        buf_size_ = ext_pack_size_ + 400;
+        buf_num_ = max_buf_pool_ / (buf_size_);
+    }
+
+    void ClientConfig::showClientConfig()
+    {
+        if(user_config_err_){
+            LOG_ERROR("dataproxy_sdk_cpp init user config err, then use 
default config values");
+        }
+
+        LOG_WARN("thread_num: %d", thread_nums_);
+        LOG_WARN("shared_buf_num: %d", shared_buf_nums_);
+        LOG_WARN("inlong_group_ids: <%s>", 
Utils::getVectorStr(inlong_group_ids_).c_str());
+        LOG_WARN("enable_groupId_isolation: %s", enable_groupId_isolation_ ? 
"true" : "fasle");
+        LOG_WARN("buffer_num_per_groupId: %d", buffer_num_per_groupId_);
+        LOG_WARN("ser_ip: %s", ser_ip_.c_str());
+        LOG_WARN("enable_pack: %s", enable_pack_ ? "true" : "false");
+        LOG_WARN("pack_size: %d", pack_size_);
+        LOG_WARN("pack_timeout: %dms", pack_timeout_);
+        LOG_WARN("ext_pack_size: %d", ext_pack_size_);
+        LOG_WARN("enable_zip: %s", enable_zip_ ? "true" : "false");
+        LOG_WARN("min_zip_len: %d", min_zip_len_);
+        LOG_WARN("enable_retry: %s", enable_retry_ ? "true" : "false");
+        LOG_WARN("retry_interval: %dms", retry_interval_);
+        LOG_WARN("retry_num: %d times", retry_num_);
+        LOG_WARN("log_num: %d", log_num_);
+        LOG_WARN("log_size: %dM", log_size_);
+        LOG_WARN("log_level: %d", log_level_);
+        LOG_WARN("log_file_type: %d", log_file_type_);
+        LOG_WARN("log_path: %s", log_path_.c_str());
+        LOG_WARN("log_enable_limit: %s", log_enable_limit_ ? "true" : "false");
+        LOG_WARN("proxy_cfg_preurl: %s", proxy_URL_.c_str());
+        LOG_WARN("net_tag: %s", net_tag_.c_str());
+        LOG_WARN("proxy_cluster_URL(proxy_cfg_url): %s", 
proxy_cluster_URL_.c_str());
+        LOG_WARN("enable_proxy_URL_from_cluster: %s", 
enable_proxy_URL_from_cluster_ ? "true" : "false");
+        LOG_WARN("proxy_update_interval: %d minutes", proxy_update_interval_);
+        LOG_WARN("proxy_url_timeout: %ds", proxy_URL_timeout_);
+        LOG_WARN("max_active_proxy: %d", max_active_proxy_num_);
+        LOG_WARN("max_buf_pool: %lu", max_buf_pool_);
+        LOG_WARN("msg_type: %d", msg_type_);
+        LOG_WARN("enable_tcp_nagle: %s", enable_TCP_nagle_ ? "true" : "false");
+        LOG_WARN("enable_hb: %s", enable_heart_beat_ ? "true" : "false");
+        LOG_WARN("heart_beat_interval: %ds", heart_beat_interval_);
+        LOG_WARN("enable_setaffinity: %s", enable_setaffinity_ ? "true" : 
"false");
+        LOG_WARN("mask_cpuaffinity: %d", mask_cpu_affinity_);
+        LOG_WARN("enable_from_dc: %s", is_from_DC_ ? "true" : "false");
+        LOG_WARN("extend_field: %d", extend_field_);
+        LOG_WARN("sendBuf num of a pool: %d", buf_num_);
+        LOG_WARN("need_auth: %s", need_auth_ ? "true" : "false");
+        LOG_WARN("auth_id: %s", auth_id_.c_str());
+        LOG_WARN("auth_key: %s", auth_key_.c_str());
+    }
+} // namespace dataproxy_sdk
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h
new file mode 100644
index 000000000..2e1823932
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/client_config.h
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
+#define DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+  class ClientConfig
+  {
+  private:
+    std::string config_path_;
+    bool user_config_err_=false;
+    int32_t buf_num_; //sendbuf num of each bufpool, 
max_buf_pool_/(ext_pack_size_+400)
+
+  public:
+    //new parameters
+    int32_t thread_nums_;           //network thread nums
+    int32_t shared_buf_nums_; //bufpool nums in shared method // TODO: cancel 
this, using buffer_num_per_groupId
+    std::vector<std::string> inlong_group_ids_; // groupId set //TODO: using 
set instead of vector?
+    bool enable_groupId_isolation_;     // different groupId data are 
dispatched to different bufpool
+    int32_t buffer_num_per_groupId_;    // bufpool num of each groupId
+    std::string net_tag_;
+
+    //pack parameters
+    bool enable_pack_;
+    uint32_t pack_size_;    //byte
+    uint32_t pack_timeout_; //ms
+    uint32_t ext_pack_size_; //byte, max length of msg
+
+    //zip parameters
+    bool enable_zip_;
+    uint32_t min_zip_len_; //ms
+
+    //resend parameters
+    bool enable_retry_;
+    uint32_t retry_interval_; //ms, resend interval if not receiving ack
+    uint32_t retry_num_;      //resend times
+
+    //log parameters
+    uint32_t log_num_;    
+    uint32_t log_size_;     // M
+    uint8_t log_level_;     // trace(4)>debug(3)>info(2)>warn(1)>error(0)
+    uint8_t log_file_type_; // output type:2->file, 1->console
+    std::string log_path_;
+    bool log_enable_limit_;
+
+    // proxy parameters
+    std::string proxy_URL_;
+    bool enable_proxy_URL_from_cluster_;
+
+    std::string proxy_cluster_URL_;
+    uint32_t proxy_update_interval_; // minute
+    uint32_t proxy_URL_timeout_;     // second, request proxyList timeout 
+    uint32_t max_active_proxy_num_;
+
+    //other parameters
+    std::string ser_ip_;        //local ip
+    uint32_t max_buf_pool_;  //byte, size of single bufpool
+    uint32_t msg_type_;
+    bool enable_TCP_nagle_;
+    bool enable_heart_beat_;
+    uint32_t heart_beat_interval_; //second
+    bool enable_setaffinity_;      // cpu setaffinity
+    uint32_t mask_cpu_affinity_;   // cpu setaffinity mask
+    bool is_from_DC_;       // sng_dc data
+    uint16_t extend_field_;
+
+    uint32_t buf_size_; // ext_pack_size+400;
+
+    // auth settings
+    bool need_auth_;
+    std::string auth_id_;
+    std::string auth_key_;
+
+    ClientConfig(const std::string config_path) : config_path_(config_path) {}
+    bool parseConfig(); // return false if parse failed
+    void defaultInit();
+    void showClientConfig();
+
+    inline bool enableCharBid() const { return (((extend_field_)&0x4) >> 2); } 
// use char type groupId、streadmId
+    inline bool enableTraceIP() const { return (((extend_field_)&0x2) >> 1); }
+    // datat type msg: datlen|data
+    inline bool isNormalDataPackFormat() const { return ((5 == msg_type_) || 
((msg_type_ >= 7) && (!(extend_field_ & 0x1)))); }
+    //  data&attr type msg: datlen|data|attr_len|attr
+    inline bool isAttrDataPackFormat() const { return ((6 == msg_type_) || 
((msg_type_ >= 7) && (extend_field_ & 0x1))); }
+
+    inline int32_t bufNum() const { return buf_num_; }
+  };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_CLIENT_CONFIG_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
new file mode 100644
index 000000000..2f46f2138
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.cc
@@ -0,0 +1,990 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proxylist_config.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <map>
+#include <rapidjson/document.h>
+#include <stdlib.h>
+
+#include "sdk_constant.h"
+#include "sdk_core.h"
+#include "executor_thread_pool.h"
+#include "ini_help.h"
+#include "logger.h"
+#include "pack_queue.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+#include "utils.h"
+namespace dataproxy_sdk
+{
+    ClusterProxyList::ClusterProxyList()
+        : cluster_id_(-1), size_(0), load_(0), is_inter_visit_(false), 
switch_value_(0), get_flag_(false), msg_num_(0), active_proxy_num_(-1), 
backup_proxy_num_(-1)
+    {
+    }
+
+    ClusterProxyList::~ClusterProxyList() { clearAllConn(); }
+ 
+    bool ClusterProxyList::isNeedLoadBalance()
+    {
+        // #if 0
+        if (!g_config->enable_heart_beat_ || !g_config->heart_beat_interval_)
+        {
+            return false;
+        }
+        if (active_proxy_num_ < 0)
+        {
+            LOG_ERROR("active_proxy_num_:%d is negative", active_proxy_num_);
+        }
+        if (load_ <= 0 || backup_proxy_num_ == 0)
+        {
+            return false;
+        }
+        // #endif
+        return true;
+    }
+
+    void ClusterProxyList::addBusAddress(const ProxyInfoPtr &proxy_info)
+    {
+        proxylist_.push_back(proxy_info);
+        unused_proxylist_[proxy_info->getString()] = proxy_info;
+    }
+
+    void ClusterProxyList::initUnusedBus()
+    {
+        for (auto it : proxylist_)
+        {
+            unused_proxylist_.emplace(it->getString(), it);
+        }
+    }
+
+    void ClusterProxyList::setActiveAndBackupBusNum(const int32_t &default_set)
+    {
+        active_proxy_num_ = std::min(default_set, size_);
+        LOG_INFO("active proxy num in config is %d, real active proxy num is 
%d, available proxy num is %d", default_set, active_proxy_num_, size_);
+        backup_proxy_num_ = std::max(0, std::min(size_ - active_proxy_num_ - 
1, constants::kBackupBusNum));
+        if (isNeedLoadBalance())
+        {
+            LOG_INFO("cluster(id:%d) need balance, backup proxy num is %d", 
cluster_id_, backup_proxy_num_);
+        }
+        else
+        {
+            backup_proxy_num_ = 0;
+            LOG_INFO("cluster(id:%d) don't do balance, cluster load:%d, 
left_avaliable proxy num:%d", cluster_id_, load_, backup_proxy_num_);
+        }
+    }
+
+    int32_t ClusterProxyList::initConn()
+    {
+        unique_write_lock<read_write_mutex> rdlck(rwmutex_);
+        return createConnSet();
+    }
+
+    int32_t ClusterProxyList::createConnSet()
+    {
+        int err_count = 0;
+        for (int i = 0; i < active_proxy_num_; i++)
+        {
+            auto res = createRandomConnForActiveSet();
+            if (!res)
+            {
+                ++err_count;
+    
+            }
+            LOG_INFO("add conn%s in active_proxy_set", 
res->getRemoteInfo().c_str());
+        }
+
+        if (isNeedLoadBalance())
+        {
+            for (int i = 0; i < backup_proxy_num_; i++)
+            {
+                auto res = createRandomConnForBackupSet();
+                if (!res)
+                {
+                    ++err_count;
+                }
+                LOG_INFO("add conn%s in backup_proxy_set", 
res->getRemoteInfo().c_str());
+            }
+        }
+        return err_count;
+    }
+
+    int32_t ClusterProxyList::clearAllConn()
+    {
+        unique_write_lock<read_write_mutex> rdlck(rwmutex_);
+
+        for (auto &it : active_proxy_set_)
+        {
+            if (it.second)
+            {
+                it.second->connClose();
+            }
+        }
+        active_proxy_set_.clear();
+        for (auto &it : backup_proxy_set_)
+        {
+            if (it.second)
+            {
+                it.second->connClose();
+            }
+        }
+        backup_proxy_set_.clear();
+    }
+
+    ConnectionPtr ClusterProxyList::getSendConn()
+    {
+        unique_write_lock<read_write_mutex> wtlck(rwmutex_); // FIXME: is 
readlock ok?
+
+        if (active_proxy_set_.empty())
+        {
+            LOG_ERROR("cluster(id:%d) active_proxy_set is empty", cluster_id_);
+            return nullptr;
+        }
+
+        ConnectionPtr res = nullptr;
+
+        ++msg_num_;
+        int32_t rr_idx = msg_num_ % active_proxy_set_.size();
+        auto rr_con = std::next(std::begin(active_proxy_set_), rr_idx)->second;
+
+        srand((uint32_t)Utils::getCurrentMsTime());
+        int32_t rand_idx = random() % active_proxy_set_.size();
+        auto rand_con = std::next(std::begin(active_proxy_set_), 
rand_idx)->second;
+
+        if (rr_con->getWaitingSend() < rand_con->getWaitingSend()) // choosing 
less waiting conn 
+        {
+            res = rr_con;
+        }
+        else
+        {
+            res = rand_con;
+        }
+
+        if (res->isStop())
+        {
+            LOG_INFO("conn %s is closed, create new conn for send", 
res->getRemoteInfo().c_str());
+            active_proxy_set_.erase(res->getRemoteInfo());
+
+            auto old_proxyinfo = res->getBusInfo();
+
+            if (unused_proxylist_.empty()) //if there is no available proxy
+            {
+                unused_proxylist_[old_proxyinfo->getString()] = old_proxyinfo;
+
+                res = createRandomConnForActiveSet();
+            }
+            else
+            {
+                res = createRandomConnForActiveSet();
+
+                unused_proxylist_[old_proxyinfo->getString()] = old_proxyinfo;
+            }
+        }
+
+        return res;
+    }
+
+    ConnectionPtr ClusterProxyList::createRandomConnForActiveSet() { return 
createRandomConn(active_proxy_set_); }
+
+    ConnectionPtr ClusterProxyList::createRandomConnForBackupSet() { return 
createRandomConn(backup_proxy_set_); }
+
+    ConnectionPtr 
ClusterProxyList::createRandomConn(std::unordered_map<std::string, 
ConnectionPtr> &conn_set)
+    {
+        if (!g_executors)
+            return nullptr;
+        if (unused_proxylist_.empty())
+        {
+            clearInvalidConns(); // proxy侧短时间内全部断连,先清除close的conn
+        }
+        if (unused_proxylist_.empty())
+        {
+            LOG_ERROR("all proxyes in proxylist have already established 
connections, something is wrong!");
+            return nullptr;
+        }
+
+        srand((uint32_t)Utils::getCurrentMsTime());
+        int32_t rand_idx = random() % unused_proxylist_.size();
+        auto proxy_info = std::next(std::begin(unused_proxylist_), 
rand_idx)->second;
+        // create conn
+        auto executor = g_executors->nextExecutor();
+        if(!executor){
+            return nullptr;
+        }
+        auto res = std::make_shared<Connection>(executor, proxy_info);
+        if (!res)
+        {
+            LOG_ERROR("failed to create new connection %s", 
proxy_info->getString().c_str());
+            return nullptr;
+        }
+        LOG_DEBUG("create new connection: post %s connect request successfully 
on network_thread(id:%d)", proxy_info->getString().c_str(),
+                  executor->threadId());
+        conn_set[proxy_info->getString()] = res;
+        unused_proxylist_.erase(proxy_info->getString());
+        return res;
+    }
+
+    // if switch_value_, cluster_id_ or size_ changes, return true
+    bool ClusterProxyList::enableUpdate(const ClusterProxyListPtr &other)
+    {
+        unique_read_lock<read_write_mutex> rdlck(rwmutex_);
+
+        if (this->switch_value_ != other->switchValue())
+        {
+            LOG_INFO("proxy ip switch_value is diff, new:%d, old:%d", 
other->switchValue(), this->switch_value_);
+            return true;
+        }
+        if (this->cluster_id_ != other->clusterId())
+        {
+            LOG_INFO("proxy ip cluster_id is diff, new:%d, old:%d", 
other->clusterId(), this->cluster_id_);
+            return true;
+        }
+        if (this->size_ != other->size())
+        {
+            LOG_INFO("proxy ip size is diff, new:%d, old:%d", other->size(), 
this->size_);
+            return true;
+        }
+        return false;
+    }
+
+    void ClusterProxyList::clearInvalidConns()
+    {
+
+        // remove invalid conn in active set
+        for (auto it = active_proxy_set_.begin(); it != 
active_proxy_set_.end();)
+        {
+            if (it->second->isStop())
+            {
+                LOG_INFO("active_proxy_set remove stop conn:%s", 
it->second->getRemoteInfo().c_str());
+                unused_proxylist_.emplace(it->second->getRemoteInfo(), 
it->second->getBusInfo()); //close and add this proxyinfo into unused
+                it = active_proxy_set_.erase(it);
+                continue;
+            }
+            ++it;
+        }
+        // remove invalid conn in backup set
+        for (auto it = backup_proxy_set_.begin(); it != 
backup_proxy_set_.end();)
+        {
+            if (it->second->isStop())
+            {
+                LOG_INFO("backup_proxy_set_ remove stop conn:%s", 
it->second->getRemoteInfo().c_str());
+                unused_proxylist_.emplace(it->second->getRemoteInfo(), 
it->second->getBusInfo()); //close and add this proxyinfo into unused
+                it = backup_proxy_set_.erase(it);
+                continue;
+            }
+            ++it;
+        }
+    }
+
+    void ClusterProxyList::keepConnsAlive()
+    {
+        unique_read_lock<read_write_mutex> rdlck(rwmutex_);
+        for (auto it : active_proxy_set_)
+        {
+            it.second->sendHB(isNeedLoadBalance());
+        }
+        for (auto it : backup_proxy_set_)
+        {
+            it.second->sendHB(isNeedLoadBalance());
+        }
+    }
+
+    void ClusterProxyList::balanceConns()
+    {
+        std::map<std::string, int32_t> active_load;
+        std::map<std::string, int32_t> backup_load;
+        unique_write_lock<read_write_mutex> wtlck(rwmutex_);
+
+        if (!isNeedLoadBalance() || backup_proxy_set_.empty())
+            return; //无需进行balance
+
+        for (auto &it : active_proxy_set_)
+        {
+            int32_t avg_load = it.second->getAvgLoad();
+            if (avg_load >= 0)
+            {
+                active_load[it.first] = avg_load;
+                LOG_DEBUG("active conn:%s, avgLoad:%d", it.first.c_str(), 
avg_load);
+            }
+        }
+        for (auto &it : backup_proxy_set_)
+        {
+            int32_t avg_load = it.second->getAvgLoad();
+            if (avg_load >= 0)
+            {
+                backup_load[it.first] = avg_load;
+                LOG_DEBUG("backup conn:%s, avgLoad:%d", it.first.c_str(), 
avg_load);
+            }
+        }
+
+        // avg_load desc sort
+        std::vector<PAIR> active_list(active_load.begin(), active_load.end());
+        std::sort(active_list.begin(), active_list.end(), 
&Utils::downValueSort);
+
+        // avg_load asec sort
+        std::vector<PAIR> backup_list(backup_load.begin(), backup_load.end());
+        std::sort(backup_list.begin(), backup_list.end(), &Utils::upValueSort);
+
+        // int32_t small_size = active_list.size() < backup_list.size() ? 
active_list : backup_list;
+        int32_t small_size = 1;
+        for (int32_t i = 0; i < small_size; i++)
+        {
+            if (active_list.empty() || backup_list.empty())
+                break;
+
+            if (active_list[i].second - backup_list[i].second >= load_) // do 
switch
+            {
+                LOG_INFO("do balance, active conn:%s, load:%d <--> backup 
conn:%s, load:%d", active_list[i].first.c_str(), active_list[i].second,
+                         backup_list[i].first.c_str(), backup_list[i].second);
+
+                ConnectionPtr &tmp = active_proxy_set_[active_list[i].first];
+                active_proxy_set_.erase(active_list[i].first);
+                active_proxy_set_[backup_list[i].first] = 
backup_proxy_set_[backup_list[i].first];
+                backup_proxy_set_.erase(backup_list[i].first);
+                backup_proxy_set_[tmp->getRemoteInfo()] = tmp;
+            }
+        }
+    }
+
+    void ClusterProxyList::updateBackupConns()
+    {
+        unique_write_lock<read_write_mutex> wtlck(rwmutex_);
+
+        if (!isNeedLoadBalance())
+            return;
+
+        // update backup conns
+        for (auto it : backup_proxy_set_)
+        {
+            if (it.second)
+            {
+                LOG_DEBUG("update backup_conns regularly, close old conns and 
then create new conns");
+                it.second->connClose();
+                unused_proxylist_.emplace(it.second->getRemoteInfo(), 
it.second->getBusInfo()); //lose and add this proxyinfo into unused
+            }
+        }
+        backup_proxy_set_.clear();
+        for (int i = 0; i < backup_proxy_num_; i++)
+        {
+            auto res = createRandomConnForBackupSet();
+            if (!res)
+            {
+                LOG_ERROR("create backup conn error, check it");
+                continue;
+            }
+            LOG_DEBUG("new create conn%s in backup_proxy_set", 
res->getRemoteInfo().c_str());
+        }
+    }
+
+    GlobalCluster::GlobalCluster()
+        : groupid2cluster_rwmutex_(), update_flag_(false), cond_mutex_(), 
cond_(), exit_flag_(false), 
timer_worker_(std::make_shared<ExecutorThread>(100)), 
clear_timer_(timer_worker_->createSteadyTimer()), 
doBalance_timer_(timer_worker_->createSteadyTimer()), 
updateBackup_timer_(timer_worker_->createSteadyTimer()), 
printAckNum_timer_(timer_worker_->createSteadyTimer())
+    {
+        clear_timer_->expires_after(std::chrono::seconds(kClearTimerSecond));
+        clear_timer_->async_wait([this](const std::error_code &ec)
+                                 { clearInvalidConn(ec); });
+
+        // whether need do balance
+        if (g_config->enable_heart_beat_ && g_config->heart_beat_interval_ > 0)
+        {
+            keepAlive_timer_ = timer_worker_->createSteadyTimer();
+            
keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+            keepAlive_timer_->async_wait([this](const std::error_code &ec)
+                                         { keepConnAlive(ec); });
+        }
+
+        doBalance_timer_->expires_after(std::chrono::minutes(kDoBalanceMin));
+        doBalance_timer_->async_wait([this](const std::error_code &ec)
+                                     { doBalance(ec); });
+
+        
printAckNum_timer_->expires_after(std::chrono::minutes(kPrintAckNumMin));
+        printAckNum_timer_->async_wait([this](const std::error_code &ec)
+                                       { printAckNum(ec); });
+
+        
updateBackup_timer_->expires_after(std::chrono::seconds(kUpdateBackupSecond + 
3));
+        updateBackup_timer_->async_wait([this](const std::error_code &ec)
+                                        { updateBackup(ec); });
+
+    }
+
+    GlobalCluster::~GlobalCluster()
+    {
+        // FIXME:need other close work?
+        timer_worker_->close();
+
+        closeBuslistUpdate();
+        if (worker_.joinable())
+        {
+            worker_.join();
+        }
+    }
+
+    void GlobalCluster::closeBuslistUpdate()
+    {
+        exit_flag_ = true;
+
+        std::unique_lock<std::mutex> con_lck(cond_mutex_);
+        update_flag_ = true;
+        con_lck.unlock();
+        cond_.notify_one();
+    }
+
+    int32_t GlobalCluster::initBuslistAndCreateConns()
+    {
+        for (auto &inlong_group_id : g_config->inlong_group_ids_)
+        {
+            groupid2cluster_map_[inlong_group_id] = -1;
+        }
+
+        doUpdate();
+
+        return 0;
+    }
+
+    void GlobalCluster::startUpdateSubroutine() { worker_ = 
std::thread(&GlobalCluster::updateSubroutine, this); }
+
+    //FIXME: improve, using getconn err num, if it exceeds a limit, return 
errcode when user send next
+    ConnectionPtr GlobalCluster::getSendConn(const std::string 
&inlong_group_id)
+    {
+        unique_read_lock<read_write_mutex> rdlck1(groupid2cluster_rwmutex_);
+        auto it1 = groupid2cluster_map_.find(inlong_group_id);
+        if (it1 == groupid2cluster_map_.end())
+        {
+            LOG_ERROR("there is no proxylist and connection for 
inlong_group_id:%s , please check inlong_group_id/url or retry later", 
inlong_group_id.c_str());
+            return nullptr;
+        }
+
+        unique_read_lock<read_write_mutex> rdlck2(cluster_set_rwmutex_);
+        auto it2 = cluster_set_.find(it1->second);
+        if (it2 == cluster_set_.end())
+        {
+            LOG_ERROR("there is no cluster(id:%d) for inlong_group_id:%s in 
cluster_set, please check inlong_group_id/url or retry later ", it1->second, 
inlong_group_id.c_str());
+            return nullptr;
+        }
+
+        return it2->second->getSendConn();
+    }
+
+    ConnectionPtr GlobalCluster::createActiveConn(const std::string 
&inlong_group_id, int32_t pool_id)
+    {
+        if (1 == user_exit_flag.get())// if user is closing sdk
+        {
+            return nullptr;
+        }
+
+        unique_read_lock<read_write_mutex> rdlck1(groupid2cluster_rwmutex_);
+        auto it1 = groupid2cluster_map_.find(inlong_group_id); // 
inlong_group_id->cluster_id
+        if (it1 == groupid2cluster_map_.end())     // all proxy conn are broken
+        {
+            LOG_ERROR("there is no proxylist or avaliable connection for 
inlong_group_id:%s , please check inlong_group_id/url or retry later", 
inlong_group_id.c_str());
+            return nullptr;
+        }
+        unique_read_lock<read_write_mutex> rdlck2(cluster_set_rwmutex_);
+        auto it2 = cluster_set_.find(it1->second); // cluster_id->proxylist
+        if (it2 == cluster_set_.end())
+        {
+            LOG_ERROR("there is no cluster(id:%d) for inlong_group_id:%s in 
cluster_set, please check inlong_group_id/url or retry later", it1->second, 
inlong_group_id.c_str());
+            return nullptr;
+        }
+
+        unique_write_lock<read_write_mutex> wtlck(it2->second->rwmutex_);
+
+        auto res = it2->second->createRandomConnForActiveSet();
+        return res;
+    }
+
+    void GlobalCluster::updateSubroutine()
+    {
+        LOG_INFO("proxylist update thread start");
+
+        while (true)
+        {
+            std::unique_lock<std::mutex> con_lck(cond_mutex_);
+            if (cond_.wait_for(con_lck, 
std::chrono::minutes(g_config->proxy_update_interval_), [this]()
+                               { return update_flag_; }))
+            {
+                if (exit_flag_)
+                    break;
+                update_flag_ = false;
+                con_lck.unlock();
+                LOG_DEBUG("new inlong_group_id is added, update proxylist");
+                doUpdate(); // FIXME:improve, only update new groupid 
+            }
+            else
+            {
+                LOG_INFO("proxy update interval is %d mins, update proxylist", 
g_config->proxy_update_interval_);
+                doUpdate();
+            }
+        }
+        LOG_INFO("proxylist update thread exit");
+    }
+
+    // add inlong_group_id's proxylist into groupid2cluster_map_, if it is 
new, trigger updating groupid2cluster_map and cluster_set
+    int32_t GlobalCluster::addBuslist(const std::string &inlong_group_id)
+    {
+        {
+            unique_read_lock<read_write_mutex> rdlck(groupid2cluster_rwmutex_);
+            auto it = groupid2cluster_map_.find(inlong_group_id);
+            if (it != groupid2cluster_map_.end())
+            {
+                return 0;
+            }
+        }
+        //not exist, add
+        {
+            unique_write_lock<read_write_mutex> 
wtlck(groupid2cluster_rwmutex_);
+            groupid2cluster_map_.emplace(inlong_group_id, -1);
+        }
+
+        //set proxy update notification
+        std::unique_lock<std::mutex> con_lck(cond_mutex_);
+        update_flag_ = true;
+        con_lck.unlock();
+        cond_.notify_one();
+
+        LOG_DEBUG("add inlong_group_id:%s to global  bid2cluster_map, and set 
notify proxy updating", inlong_group_id.c_str());
+        return 0;
+    }
+
+    void GlobalCluster::doUpdate()
+    {
+        if (groupid2cluster_map_.empty())
+        {
+            LOG_INFO("empty inlong_group_id, no need to update proxylist");
+            return;
+        }
+
+        std::ofstream outfile;
+        outfile.open(".proxy_list.ini.tmp", std::ios::out | std::ios::trunc);
+        int32_t groupId_count = 0; //flush to file, record index and count
+
+        {
+            unique_write_lock<read_write_mutex> 
wtlck(groupid2cluster_rwmutex_);
+
+            // for (auto& it : proxylist_map_)
+            for (auto &bid2cluster : groupid2cluster_map_)
+            {
+                //拼接tdm请求的url
+                std::string url;
+                if (g_config->enable_proxy_URL_from_cluster_)
+                    url = g_config->proxy_cluster_URL_;
+                else
+                {
+                    url = g_config->proxy_URL_ + "/" + bid2cluster.first;
+                }
+                std::string post_data = "ip=" + g_config->ser_ip_ + 
"&version=" + constants::kTDBusCAPIVersion;
+                LOG_WARN("get inlong_group_id:%s proxy cfg url:%s, 
post_data:%s", bid2cluster.first.c_str(), url.c_str(), post_data.c_str());
+
+                // request proxylist from mananer, if failed multi-times, read 
from local cache file
+                std::string meta_data;
+                int32_t ret;
+                for (int i = 0; i < constants::kMaxRequestTDMTimes; i++)
+                {
+                    HttpRequest request = {url, g_config->proxy_URL_timeout_, 
g_config->need_auth_, g_config->auth_id_, g_config->auth_key_, post_data};
+                    ret = Utils::requestUrl(meta_data, &request);
+                    if (!ret)
+                    {
+                        break;
+                    } //request success
+                }
+
+                if (!ret) // success
+                {
+                    LOG_WARN("get inlong_group_id:%s proxy json list from tdm: 
%s", bid2cluster.first.c_str(), meta_data.c_str());
+                }
+                else //request manager error
+                {
+                    LOG_ERROR("failed to request inlong_group_id:%s proxylist 
from tdm, has tried max_times(%d)", bid2cluster.first.c_str(),
+                              constants::kMaxRequestTDMTimes);
+
+                    if (bid2cluster.second != -1 && 
cluster_set_.find(bid2cluster.second) != cluster_set_.end())
+                    {
+                        LOG_WARN("failed to request inlong_group_id:%s 
proxylist from tdm, use previous proxylist", bid2cluster.first.c_str());
+                        continue;
+                    }
+                    else //new groupid, try to read from cache proxylist
+                    {
+                        LOG_WARN("failed to request inlong_group_id:%s 
proxylist from tdm, also no previous proxylist, then try to find from cache 
file",
+                                 bid2cluster.first.c_str());
+                        auto it = 
cache_groupid2metaInfo_.find(bid2cluster.first);
+                        if (it != cache_groupid2metaInfo_.end())
+                        {
+                            meta_data = it->second;
+                            LOG_WARN("get inlong_group_id:%s proxy json from 
cache file: %s", bid2cluster.first.c_str(), meta_data.c_str());
+                        }
+                        else
+                        {
+                            LOG_ERROR("failed to find inlong_group_id:%s 
proxylist from cache file", bid2cluster.first.c_str());
+                            continue;
+                        }
+                    }
+                }
+
+                ClusterProxyListPtr new_proxylist_cfg = 
std::make_shared<ClusterProxyList>();
+
+                ret = parseAndGet(bid2cluster.first, meta_data, 
new_proxylist_cfg);
+                if (ret)
+                {
+                    LOG_ERROR("failed to parse inlong_group_id:%s json 
proxylist", bid2cluster.first.c_str());
+                    continue;
+                }
+
+                unique_write_lock<read_write_mutex> 
wtlck_cluster_set(cluster_set_rwmutex_);
+
+                cache_groupid2metaInfo_[bid2cluster.first] = meta_data; //for 
disaster
+
+                // #if 0
+
+                // case1. new groupid, but there is its clusterid in memory
+                if (bid2cluster.second == -1 && 
cluster_set_.find(new_proxylist_cfg->clusterId()) != cluster_set_.end())
+                {
+                    auto cluster_id = new_proxylist_cfg->clusterId();
+                    if 
(cluster_set_[cluster_id]->enableUpdate(new_proxylist_cfg))
+                    { //已有的cluster需要更新
+                        new_proxylist_cfg->initConn();
+                        cluster_set_[cluster_id] = new_proxylist_cfg;
+                        LOG_INFO("update cluster(id:%d) info and connections", 
cluster_id);
+                    }
+                    bid2cluster.second = cluster_id;
+                    LOG_INFO("add inlong_group_id:%s to bid2cluster, its 
cluster(id:%d) is already in global_cluster", bid2cluster.first.c_str(), 
cluster_id);
+                    continue;
+                }
+
+                // case2. new groupid, there is no its clusterid in memory, 
update groupid2cluster, add it into cluster
+                if (bid2cluster.second == -1 && 
cluster_set_.find(new_proxylist_cfg->clusterId()) == cluster_set_.end())
+                {
+                    bid2cluster.second = new_proxylist_cfg->clusterId();
+                    new_proxylist_cfg->initConn();
+                    cluster_set_[bid2cluster.second] = new_proxylist_cfg;
+                    LOG_INFO("add inlong_group_id:%s to cluster(id:%d) map in 
global_cluster, and init connections completely", bid2cluster.first.c_str(),
+                             new_proxylist_cfg->clusterId());
+
+                    continue;
+                }
+
+                // case3. already existing groupid, whether needs update
+                if 
(cluster_set_[bid2cluster.second]->enableUpdate(new_proxylist_cfg))
+                {
+                    new_proxylist_cfg->initConn();
+                    cluster_set_[bid2cluster.second] = new_proxylist_cfg;
+                    LOG_INFO("update cluster(id:%d) info and connections", 
bid2cluster.second);
+                }
+                // #endif
+            }
+            // flush cache_bid2metaInfo to file
+            for (auto &it : cache_groupid2metaInfo_)
+            {
+                writeMetaData2File(outfile, groupId_count, it.first, 
it.second);
+                groupId_count++;
+            }
+        }
+
+        if (outfile)
+        {
+            if (groupId_count)
+            {
+                outfile << "[main]" << std::endl;
+                outfile << "groupId_count=" << groupId_count << std::endl;
+            }
+            outfile.close();
+        }
+        if (groupId_count)
+            rename(".proxy_list.ini.tmp", ".proxy_list.ini");
+    }
+
+    void GlobalCluster::writeMetaData2File(std::ofstream &file, int32_t 
groupId_index, const std::string &inlong_group_id, const std::string &meta_data)
+    {
+        file << "[inlong_group_id" << groupId_index << "]" << std::endl;
+        file << "inlong_group_id=" << inlong_group_id << std::endl;
+        file << "proxy_cfg=" << meta_data << std::endl;
+    }
+
+    int32_t GlobalCluster::readCacheBuslist()
+    {
+        IniFile ini = IniFile();
+        if (ini.load(".proxy_list.ini"))
+        {
+            LOG_INFO("there is no proxylist cache file");
+            return 1;
+        }
+        int32_t groupId_count = 0;
+        if (ini.getInt("main", "groupId_count", &groupId_count))
+        {
+            LOG_WARN("failed to parse .proxylist.ini file");
+            return 1;
+        }
+        for (int32_t i = 0; i < groupId_count; i++)
+        {
+            std::string bidlist = "inlong_group_id" + std::to_string(i);
+            std::string inlong_group_id, proxy;
+            if (ini.getString(bidlist, "inlong_group_id", &inlong_group_id))
+            {
+                LOG_WARN("failed to get %s name from cache file", 
inlong_group_id.c_str());
+                continue;
+            }
+            if (ini.getString(bidlist, "proxy_cfg", &proxy))
+            {
+                LOG_WARN("failed to get %s cache proxylist", 
inlong_group_id.c_str());
+                continue;
+            }
+            LOG_INFO("read cache file, inlong_group_id:%s, proxy_cfg:%s", 
inlong_group_id.c_str(), proxy.c_str());
+            cache_groupid2metaInfo_[inlong_group_id] = proxy;
+        }
+
+        return 0;
+    }
+
+    //parse proxylist meta
+    
//{"success":true,"errMsg":null,"data":{"clusterId":1,"isIntranet":null,"isSwitch":null,"load":20,"nodeList":[{"id":1,"ip":"127.0.0.1.160","port":46801}]}}
+    int32_t GlobalCluster::parseAndGet(const std::string &inlong_group_id, 
const std::string &meta_data, ClusterProxyListPtr proxylist_config)
+    {   
+        rapidjson::Document doc;
+        if (doc.Parse(meta_data.c_str()).HasParseError())
+        {
+            LOG_ERROR("failed to parse meta_data, error:(%d:%d)", 
doc.GetParseError(), doc.GetErrorOffset());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+
+        if (!(doc.HasMember("success") && doc["success"].IsBool() && 
doc["success"].GetBool()))
+        {
+           LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, success: 
not exist or false", inlong_group_id.c_str());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+        // check data valid
+        if (!doc.HasMember("data") || doc["data"].IsNull())
+        {
+           LOG_ERROR("failed to get proxy_list of inlong_group_id:%s, data: 
not exist or null", inlong_group_id.c_str());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+
+        // check nodelist valid
+        const rapidjson::Value &clusterInfo = doc["data"];
+        if (!clusterInfo.HasMember("nodeList") || 
clusterInfo["nodeList"].IsNull())
+        {
+            LOG_ERROR("invalid nodeList of inlong_group_id:%s, not exist or 
null", inlong_group_id.c_str());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+
+        // check nodeList isn't empty
+        const rapidjson::Value &nodeList = clusterInfo["nodeList"];
+        if (nodeList.GetArray().Size() == 0)
+        {
+            LOG_ERROR("empty nodeList of inlong_group_id:%s", 
inlong_group_id.c_str());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+        // check clusterId
+        if (!clusterInfo.HasMember("clusterId") || 
!clusterInfo["clusterId"].IsInt() || clusterInfo["clusterId"].GetInt() < 0)
+        {
+            LOG_ERROR("clusterId of inlong_group_id:%s is not found or not a 
integer", inlong_group_id.c_str());
+            return SDKInvalidResult::kErrorParseJson;
+        }
+        else
+        {
+            const rapidjson::Value &obj = clusterInfo["clusterId"];
+            proxylist_config->setClusterId(obj.GetInt());
+        }
+        // check isSwitch
+        if (clusterInfo.HasMember("isSwitch") && 
clusterInfo["isSwitch"].IsInt() && !clusterInfo["isSwitch"].IsNull())
+        {
+            const rapidjson::Value &obj = clusterInfo["isSwitch"];
+            proxylist_config->setSwitchValue(obj.GetInt());
+        }
+        else
+        {
+            LOG_WARN("switch of inlong_group_id:%s is not found or not a 
integer", inlong_group_id.c_str());
+            proxylist_config->setSwitchValue(0);
+        }
+        // check load
+        if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() && 
!clusterInfo["load"].IsNull())
+        {
+            const rapidjson::Value &obj = clusterInfo["load"];
+            proxylist_config->setLoad(obj.GetInt());
+        }
+        else
+        {
+            LOG_WARN("load of inlong_group_id:%s is not found or not a 
integer", inlong_group_id.c_str());
+            proxylist_config->setLoad(0);
+        }
+        // check isIntranet
+        if (clusterInfo.HasMember("isIntranet") && 
clusterInfo["isIntranet"].IsInt() && !clusterInfo["isIntranet"].IsNull())
+        {
+            const rapidjson::Value &obj = clusterInfo["isIntranet"];
+            if (!obj.GetInt())
+                proxylist_config->setIsInterVisit(false);
+        }
+        else
+        {
+            LOG_WARN("isIntranet of inlong_group_id:%s is not found or not a 
integer", inlong_group_id.c_str());
+             proxylist_config->setIsInterVisit(true);
+        }
+        // proxy list
+        for (auto &proxy: nodeList.GetArray())
+        {
+            std::string ip;
+            int32_t port, id;
+            if (proxy.HasMember("ip") && !proxy["ip"].IsNull())
+                ip = proxy["ip"].GetString();
+            else
+            {
+                LOG_ERROR("this ip info is null");
+                continue;
+            }
+            if (proxy.HasMember("port") && !proxy["port"].IsNull())
+            {
+                if (proxy["port"].IsString())
+                    port = std::stoi(proxy["port"].GetString());
+                else if (proxy["port"].IsInt())
+                    port = proxy["port"].GetInt();
+            }
+
+            else
+            {
+                LOG_ERROR("this ip info is null or negative");
+                continue;
+            }
+            if (proxy.HasMember("id") && !proxy["id"].IsNull())
+            {
+                if (proxy["id"].IsString())
+                    id = std::stoi(proxy["id"].GetString());
+                else if (proxy["id"].IsInt())
+                    id = proxy["id"].GetInt();
+            }
+            else
+            {
+                LOG_WARN("there is no id info of inlong_group_id");
+                continue;
+            }
+            proxylist_config->addBusAddress(std::make_shared<ProxyInfo>(id, 
ip, port));
+
+        }
+        // set size
+        proxylist_config->setSize(nodeList.GetArray().Size());
+
+        // init unused_proxylist_
+        proxylist_config->initUnusedBus();
+
+        //set active_proxy_num and backup_proxy_num
+        
proxylist_config->setActiveAndBackupBusNum(g_config->max_active_proxy_num_);
+
+        return 0;
+    }
+
+    void GlobalCluster::clearInvalidConn(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+
+        {
+            unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+            for (auto &it : cluster_set_)
+            {
+                unique_write_lock<read_write_mutex> wtlck(it.second->rwmutex_);
+                it.second->clearInvalidConns();
+                //补充连接
+                int32_t count = it.second->activeBusNeedCreate();
+                for (int32_t i = 0; i < count; i++)
+                {
+                    it.second->createRandomConnForActiveSet();
+                }
+                count = it.second->backupBusNeedCreate();
+                for (int32_t i = 0; i < count; i++)
+                {
+                    it.second->createRandomConnForBackupSet();
+                }
+            }
+        }
+
+        clear_timer_->expires_after(std::chrono::seconds(kClearTimerSecond));
+        clear_timer_->async_wait([this](const std::error_code &ec)
+                                 { clearInvalidConn(ec); });
+    }
+
+    void GlobalCluster::keepConnAlive(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+
+        {
+            unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+            for (auto &it : cluster_set_)
+            {
+                it.second->keepConnsAlive();
+            }
+        }
+
+        
keepAlive_timer_->expires_after(std::chrono::seconds(g_config->heart_beat_interval_));
+        keepAlive_timer_->async_wait([this](const std::error_code &ec)
+                                     { keepConnAlive(ec); });
+    }
+
+    void GlobalCluster::doBalance(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+
+        {
+            unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+            for (auto &it : cluster_set_)
+            {
+                it.second->balanceConns();
+            }
+        }
+
+        doBalance_timer_->expires_after(std::chrono::minutes(kDoBalanceMin));
+        doBalance_timer_->async_wait([this](const std::error_code &ec)
+                                     { doBalance(ec); });
+    }
+
+    void GlobalCluster::updateBackup(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+
+        {
+            unique_read_lock<read_write_mutex> rdlck(cluster_set_rwmutex_);
+            for (auto &it : cluster_set_)
+            {
+                it.second->updateBackupConns();
+            }
+        }
+
+        srand((uint32_t)Utils::getCurrentMsTime());
+        int32_t rand_idx = random() % (constants::kPrimeSize);
+
+        
updateBackup_timer_->expires_after(std::chrono::seconds(kUpdateBackupSecond + 
constants::kPrime[rand_idx]));
+        updateBackup_timer_->async_wait([this](const std::error_code &ec)
+                                        { updateBackup(ec); });
+    }
+
+    void GlobalCluster::printAckNum(const std::error_code &ec)
+    {
+        if (ec)
+            return;
+        g_queues->printAck();
+        g_queues->showState();
+
+        
printAckNum_timer_->expires_after(std::chrono::minutes(kPrintAckNumMin));
+        printAckNum_timer_->async_wait([this](const std::error_code &ec)
+                                       { printAckNum(ec); });
+    }
+
+} // namespace dataproxy_sdk
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h
new file mode 100644
index 000000000..edc128679
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/proxylist_config.h
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
+#define DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
+
+#include <asio.hpp>
+#include <chrono>
+#include <condition_variable>
+#include <fstream>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include "sdk_core.h"
+#include "client_config.h"
+#include "noncopyable.h"
+#include "read_write_mutex.h"
+namespace dataproxy_sdk
+{
+  // proxyinfo: {"port":46801,"ip":"100.125.58.214","id":1}
+  class ProxyInfo
+  {
+  private:
+    int32_t proxy_id_;
+    std::string ip_;
+    int32_t port_num_;
+
+  public:
+    ProxyInfo(int32_t proxy_id, std::string ip, int32_t port_num) : 
proxy_id_(proxy_id), ip_(ip), port_num_(port_num) {}
+    std::string getString() const { return "[" + ip_ + ":" + 
std::to_string(port_num_) + "]"; }
+
+    int32_t proxyId() const { return proxy_id_; }
+
+    std::string ip() const { return ip_; }
+    void setIp(const std::string &ip) { ip_ = ip; }
+
+    int32_t portNum() const { return port_num_; }
+    void setPortNum(const int32_t &port_num) { port_num_ = port_num; }
+  };
+  //using ProxyInfoPtr = std::shared_ptr<BusInfo>;
+
+  class ClusterProxyList;
+  using ClusterProxyListPtr = std::shared_ptr<ClusterProxyList>;
+  class ClusterProxyList
+  {
+  private:
+    int32_t cluster_id_;
+    std::vector<ProxyInfoPtr> proxylist_;
+    int32_t size_;
+    int32_t load_;
+    bool is_inter_visit_;
+    int32_t switch_value_;
+    bool get_flag_;
+    int32_t active_proxy_num_; //min(config->max_active_proxy_num_, available 
proxy num)
+    int32_t backup_proxy_num_; //ess than (size_-active_proxy_num_-1)
+
+    std::unordered_map<std::string, ConnectionPtr> active_proxy_set_; 
//key:ip+port
+    std::unordered_map<std::string, ConnectionPtr> backup_proxy_set_;
+
+    //should be initialized as proxylist
+    std::unordered_map<std::string, ProxyInfoPtr> unused_proxylist_;
+
+    uint32_t msg_num_; //cumulate sent msg count
+
+  public:
+    read_write_mutex rwmutex_;
+
+  public:
+    explicit ClusterProxyList();
+    virtual ~ClusterProxyList();
+
+    virtual bool isNeedLoadBalance();
+    void addBusAddress(const ProxyInfoPtr &proxy_info);
+    bool enableUpdate(const ClusterProxyListPtr &other);
+    virtual int32_t initConn(); //init active conns and backup conns
+    int32_t clearAllConn();
+    void clearInvalidConns();
+    void keepConnsAlive();
+    void balanceConns();
+    void updateBackupConns();
+    ConnectionPtr getSendConn();
+
+    ConnectionPtr createRandomConnForActiveSet();
+    ConnectionPtr createRandomConnForBackupSet();
+
+    int32_t size() const { return size_; }
+    void setSize(const int32_t &size) { size_ = size; }
+    int32_t clusterId() const { return cluster_id_; }
+    void setClusterId(const int32_t &cluster_id) { cluster_id_ = cluster_id; }
+    int32_t switchValue() const { return switch_value_; }
+    void setSwitchValue(const int32_t &switch_value) { switch_value_ = 
switch_value; }
+    int32_t load() const { return load_; }
+    void setLoad(const int32_t &load) { load_ = load; }
+    bool isInterVisit() const { return is_inter_visit_; }
+    void setIsInterVisit(bool is_inter_visit) { is_inter_visit_ = 
is_inter_visit; }
+    bool getFlag() const { return get_flag_; }
+    void setGetFlag(bool get_flag) { get_flag_ = get_flag; }
+    int32_t activeBusNum() const { return active_proxy_num_; }
+    void setActiveAndBackupBusNum(const int32_t &default_set);
+    int32_t backupBusNum() const { return backup_proxy_num_; }
+
+    int32_t backupBusNeedCreate() const
+    {
+      if (backup_proxy_set_.size() < backup_proxy_num_)
+        return backup_proxy_num_ - backup_proxy_set_.size();
+      return 0;
+    }
+    int32_t activeBusNeedCreate() const
+    {
+      if (active_proxy_set_.size() < active_proxy_num_)
+        return active_proxy_num_ - active_proxy_set_.size();
+      return 0;
+    }
+    void initUnusedBus();
+    
+  private:
+    ConnectionPtr createRandomConn(std::unordered_map<std::string, 
ConnectionPtr> &conn_set);
+    int32_t createConnSet();
+
+  };
+
+  class GlobalCluster : noncopyable
+  {
+  private:
+    std::unordered_map<std::string, int32_t> groupid2cluster_map_;   
//<inlong_group_id,cluster_id>
+    std::unordered_map<int32_t, ClusterProxyListPtr> cluster_set_; 
//key:cluster_id
+
+    std::unordered_map<std::string, std::string> cache_groupid2metaInfo_; 
//<inlong_group_id,proxylist>, for disaster, read from local cache file
+
+    read_write_mutex groupid2cluster_rwmutex_;
+    read_write_mutex cluster_set_rwmutex_;
+
+    bool update_flag_;
+    std::mutex cond_mutex_;
+    std::condition_variable cond_;
+    bool exit_flag_; //exit proxylist update thread
+    std::thread worker_;
+    ExecutorThreadPtr timer_worker_; //clean invalid conn, send hb, and do 
balance betweeen activeconn and backupconn
+
+    SteadyTimerPtr clear_timer_;
+    SteadyTimerPtr keepAlive_timer_;
+    SteadyTimerPtr doBalance_timer_; // do balance switch between active and 
backup
+    SteadyTimerPtr printAckNum_timer_;
+    SteadyTimerPtr updateBackup_timer_;
+
+    enum
+    { 
+      kClearTimerSecond = 10, // FIXME: whether need modification
+      kDoBalanceMin = 5,
+      kPrintAckNumMin = 1,       // ack msg in one min cycle
+      kUpdateBackupSecond = 300, // kDoBalanceMin
+    };
+
+  public:
+    explicit GlobalCluster();
+    ~GlobalCluster();
+
+    int32_t initBuslistAndCreateConns();
+    void startUpdateSubroutine();
+    int32_t addBuslist(const std::string &inlong_group_id);
+    virtual ConnectionPtr getSendConn(const std::string &inlong_group_id);
+    ConnectionPtr createActiveConn(const std::string &inlong_group_id, int32_t 
pool_id); 
+    void closeBuslistUpdate();
+    int32_t readCacheBuslist();
+
+  private:                    
+    void updateSubroutine();//update proxylist
+    void doUpdate();
+    int32_t parseAndGet(const std::string &inlong_group_id, const std::string 
&meta_data, ClusterProxyListPtr proxylist_config);
+    void writeMetaData2File(std::ofstream &file, int32_t group, const 
std::string &inlong_group_id, const std::string &meta_data);
+
+    void clearInvalidConn(const std::error_code &ec);
+    void keepConnAlive(const std::error_code &ec); // send hb
+    void doBalance(const std::error_code &ec);
+    void updateBackup(const std::error_code &ec);
+    virtual void printAckNum(const std::error_code &ec);
+    // void cancelAllTimer();
+  };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_BUSLIST_CONFIG_H_
\ No newline at end of file

Reply via email to