This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f01d49347e [INLONG-10780][SDK] Optimize memory management for
DataProxy CPP SDK (#10792)
f01d49347e is described below
commit f01d49347e6932dea5ae95b37111a5f9bdde0244
Author: doleyzi <[email protected]>
AuthorDate: Wed Aug 14 16:33:32 2024 +0800
[INLONG-10780][SDK] Optimize memory management for DataProxy CPP SDK
(#10792)
---
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 46 +++++++++---
.../dataproxy-sdk-cpp/src/config/sdk_conf.h | 3 +
.../dataproxy-sdk-cpp/src/manager/buffer_manager.h | 85 ++++++++++++++++++++++
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 2 +
4 files changed, 126 insertions(+), 10 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index 319262adc2..68cc122c4c 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -38,6 +38,9 @@ bool SdkConfig::ParseConfig(const std::string &config_path) {
// Guaranteed to only parse the configuration file once
if (!__sync_bool_compare_and_swap(&parsed_, false, true)) {
LOG_INFO("ParseConfig has parsed .");
+ if (++instance_num_ > max_instance_) {
+ return false;
+ }
return true;
}
@@ -92,7 +95,7 @@ void SdkConfig::defaultInit() {
load_balance_interval_ = constants::kLoadBalanceInterval;
heart_beat_interval_ = constants::kHeartBeatInterval;
enable_balance_ = constants::kEnableBalance;
- isolation_level_=constants::IsolationLevel::kLevelSecond;
+ isolation_level_ = constants::IsolationLevel::kLevelSecond;
// cache parameter
send_buf_size_ = constants::kSendBufSize;
@@ -132,6 +135,10 @@ void SdkConfig::defaultInit() {
enable_setaffinity_ = constants::kEnableSetAffinity;
mask_cpu_affinity_ = constants::kMaskCPUAffinity;
extend_field_ = constants::kExtendField;
+
+ need_auth_ = constants::kNeedAuth;
+ max_instance_ = constants::kMaxInstance;
+ instance_num_ = 1;
}
void SdkConfig::InitThreadParam(const rapidjson::Value &doc) {
@@ -212,6 +219,14 @@ void SdkConfig::InitCacheParam(const rapidjson::Value
&doc) {
} else {
max_stream_id_num_ = constants::kMaxGroupIdNum;
}
+
+ // max_cache_num
+ if (doc.HasMember("max_cache_num") && doc["max_cache_num"].IsInt() &&
doc["max_cache_num"].GetInt() >= 0) {
+ const rapidjson::Value &obj = doc["max_cache_num"];
+ max_cache_num_ = obj.GetInt();
+ } else {
+ max_cache_num_ = constants::kMaxCacheNum;
+ }
}
void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
@@ -431,9 +446,10 @@ void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
} else {
need_auth_ = constants::kNeedAuth;
LOG_INFO("need_auth is not expect, then use default:%s" << need_auth_
- ? "true"
- : "false");
+ ? "true"
+ : "false");
}
+
}
void SdkConfig::OthersParam(const rapidjson::Value &doc) {
// ser_ip
@@ -475,12 +491,20 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
} else {
extend_field_ = constants::kExtendField;
}
+
+ // instance num
+ if (doc.HasMember("max_instance") && doc["max_instance"].IsInt() &&
doc["max_instance"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["max_instance"];
+ max_instance_ = obj.GetInt();
+ } else {
+ max_instance_ = constants::kMaxInstance;
+ }
}
-bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string&
localhost) {
+bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string
&localhost) {
int32_t sockfd;
int32_t ip_num = 0;
- char buf[1024] = {0};
+ char buf[1024] = {0};
struct ifreq *ifreq;
struct ifreq if_flag;
struct ifconf ifconf;
@@ -493,7 +517,7 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info,
std::string& localhos
}
ioctl(sockfd, SIOCGIFCONF, &ifconf);
- ifreq = (struct ifreq *)buf;
+ ifreq = (struct ifreq *) buf;
ip_num = ifconf.ifc_len / sizeof(struct ifreq);
for (int32_t i = 0; i < ip_num; i++, ifreq++) {
if (ifreq->ifr_flags != AF_INET) {
@@ -511,11 +535,11 @@ bool SdkConfig::GetLocalIPV4Address(std::string&
err_info, std::string& localhos
continue;
}
- if (!strncmp(inet_ntoa(((struct
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
+ if (!strncmp(inet_ntoa(((struct sockaddr_in *)
&(ifreq->ifr_addr))->sin_addr),
"127.0.0.1", 7)) {
continue;
}
- localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+ localhost = inet_ntoa(((struct sockaddr_in *)
&(ifreq->ifr_addr))->sin_addr);
close(sockfd);
err_info = "Ok";
return true;
@@ -545,8 +569,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
LOG_INFO(
"enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
- ? "true"
- : "false");
+ ? "true"
+ : "false");
LOG_INFO("manager_update_interval: minutes" << manager_update_interval_);
LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
LOG_INFO("max_tcp_num: " << max_proxy_num_);
@@ -566,6 +590,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
LOG_INFO("isolation_level: " << isolation_level_);
+ LOG_INFO("max_instance: " << max_instance_);
+ LOG_INFO("max_cache_num: " << max_cache_num_);
}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
index 6d7b23dc21..f8581a1198 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h
@@ -50,6 +50,9 @@ private:
uint32_t send_buf_size_; // Send buf size, bid granularity
uint32_t max_group_id_num_; // Send buf size, bid granularity
uint32_t max_stream_id_num_; // Send buf size, bid granularity
+ uint32_t max_cache_num_;
+ uint32_t max_instance_;
+ uint32_t instance_num_;
// thread parameters
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
new file mode 100644
index 0000000000..428188e38e
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <queue>
+
+#include "../config/sdk_conf.h"
+#include "../utils/send_buffer.h"
+
+#ifndef INLONG_BUFFER_MANAGER_H
+#define INLONG_BUFFER_MANAGER_H
+
+namespace inlong {
+class BufferManager {
+ private:
+ std::queue<SendBufferPtrT> buffer_queue_;
+ mutable std::mutex mutex_;
+ uint32_t queue_limit_;
+ BufferManager() {
+ uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
+ SdkConfig::getInstance()->pack_size_);
+ uint32_t buffer_num =
+ (SdkConfig::getInstance()->recv_buf_size_ / data_capacity_) *
+ SdkConfig::getInstance()->instance_num_;
+ queue_limit_ =
+ std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num);
+ LOG_INFO("Data capacity:"
+ << data_capacity_ << ", buffer num: " << buffer_num
+ << ", instance num: " <<
SdkConfig::getInstance()->instance_num_
+ << ", limit: " << queue_limit_ << " ,max cache num: "
+ << SdkConfig::getInstance()->max_cache_num_);
+ for ( uint32_t index = 0; index < queue_limit_; index++) {
+ std::shared_ptr<SendBuffer> send_buffer =
+ std::make_shared<SendBuffer>(data_capacity_);
+ if (send_buffer == nullptr) {
+ LOG_INFO("Buffer manager is null");
+ continue;
+ }
+ AddSendBuffer(send_buffer);
+ }
+ }
+
+ public:
+ static BufferManager *GetInstance() {
+ static BufferManager instance;
+ return &instance;
+ }
+ SendBufferPtrT GetSendBuffer() {
+ std::lock_guard<std::mutex> lck(mutex_);
+ if (buffer_queue_.empty()) {
+ return nullptr;
+ }
+ SendBufferPtrT buf = buffer_queue_.front();
+ buffer_queue_.pop();
+ return buf;
+ }
+ void AddSendBuffer(const SendBufferPtrT &send_buffer) {
+ if (nullptr == send_buffer) {
+ return;
+ }
+ send_buffer->releaseBuf();
+ std::lock_guard<std::mutex> lck(mutex_);
+ if (buffer_queue_.size() > queue_limit_) {
+ return;
+ }
+ buffer_queue_.emplace(send_buffer);
+ }
+};
+} // namespace inlong
+#endif // INLONG_BUFFER_MANAGER_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 1dbebd03db..399bd1b348 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -41,6 +41,8 @@ static const int32_t kSendBufSize = 10240000;
static const int32_t kRecvBufSize = 10240000;
static const uint32_t kMaxGroupIdNum = 50;
static const uint32_t kMaxStreamIdNum = 100;
+static const uint32_t kMaxCacheNum = 10;
+static const uint32_t kMaxInstance = 30;
static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;