This is an automated email from the ASF dual-hosted git repository.
vinner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73881b4b12 [INLONG-10861][SDK] Optimize the coredump caused by the
DataProxy C++ SDK (#10862)
73881b4b12 is described below
commit 73881b4b12ffe4939e9b14bd155f576398b77f22
Author: doleyzi <[email protected]>
AuthorDate: Fri Aug 23 10:08:18 2024 +0800
[INLONG-10861][SDK] Optimize the coredump caused by the DataProxy C++ SDK
(#10862)
---
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 23 +++++-------
.../dataproxy-sdk-cpp/src/core/api_imp.h | 5 ++-
.../dataproxy-sdk-cpp/src/manager/buffer_manager.h | 11 +++++-
.../dataproxy-sdk-cpp/src/manager/metric_manager.h | 18 +++++++---
.../dataproxy-sdk-cpp/src/manager/msg_manager.h | 12 +++++--
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 4 +++
.../dataproxy-sdk-cpp/src/utils/utils.cc | 42 ++++++----------------
7 files changed, 58 insertions(+), 57 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 2109af502e..a9522eee69 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -34,8 +34,6 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return SdkCode::kMultiInit;
}
- user_exit_flag_.getAndSet(0);
-
if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) {
return SdkCode::kErrorInit;
}
@@ -48,6 +46,9 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
int32_t ApiImp::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
UserCallBack call_back) {
+ if (inited_ == false || exit_flag_) {
+ return SdkCode::kSendBeforeInit;
+ }
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
@@ -57,6 +58,9 @@ int32_t ApiImp::Send(const char *inlong_group_id, const char
*inlong_stream_id,
}
int32_t ApiImp::Send(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len,
int64_t data_time, UserCallBack call_back) {
+ if (inited_ == false || exit_flag_) {
+ return SdkCode::kSendBeforeInit;
+ }
int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len);
if(code !=SdkCode::kSuccess){
return code;
@@ -73,10 +77,6 @@ int32_t ApiImp::ValidateParams(const char *inlong_group_id,
const char *inlong_s
if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg ==
nullptr || msg_len <= 0) {
return SdkCode::kInvalidInput;
}
-
- if (inited_ == false) {
- return SdkCode::kSendBeforeInit;
- }
return SdkCode::kSuccess;
}
@@ -99,10 +99,7 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id,
const std::string&
}
int32_t ApiImp::CloseApi(int32_t max_waitms) {
- if (!__sync_bool_compare_and_swap(&init_flag_, false, true)) {
- LOG_ERROR("sdk has been closed! .");
- return SdkCode::kMultiExits;
- }
+ exit_flag_ = true;
std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
return SdkCode::kSuccess;
}
@@ -126,8 +123,7 @@ int32_t ApiImp::DoInit() {
}
int32_t ApiImp::CheckData(const std::string& inlong_group_id, const
std::string& inlong_stream_id, const std::string& msg) {
- if (init_succeed_ == 0 || user_exit_flag_.get() == 1) {
- LOG_ERROR("capi has been closed, Init first and then send");
+ if (!init_succeed_) {
return SdkCode::kSendAfterClose;
}
@@ -154,14 +150,13 @@ int32_t ApiImp::InitManager() {
recv_manager_ = std::make_shared<RecvManager>(send_manager_);
if (!recv_manager_) {
- LOG_ERROR("fail to Init global packqueue");
return SdkCode::kErrorInit;
}
init_succeed_ = true;
return SdkCode::kSuccess;
}
int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) {
- if (inited_ == false) {
+ if (!inited_) {
return SdkCode::kSendBeforeInit;
}
for (auto group_id : group_ids) {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
index cd6a9757a4..9671d8fc2b 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h
@@ -53,11 +53,10 @@ class ApiImp {
int32_t ValidateParams(const char *inlong_group_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len);
- AtomicInt user_exit_flag_{0};
- volatile bool init_flag_ = false;
volatile bool inited_ = false;
volatile bool init_succeed_ = false;
- AtomicInt buf_full_{0};
+ volatile bool exit_flag_ = false;
+
uint32_t max_msg_length_;
std::string local_ip_;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
index 428188e38e..ed0c253baa 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/buffer_manager.h
@@ -31,6 +31,7 @@ class BufferManager {
std::queue<SendBufferPtrT> buffer_queue_;
mutable std::mutex mutex_;
uint32_t queue_limit_;
+ bool exit_= false;
BufferManager() {
uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
@@ -54,6 +55,11 @@ class BufferManager {
AddSendBuffer(send_buffer);
}
}
+ ~BufferManager(){
+ std::lock_guard<std::mutex> lck(mutex_);
+ exit_ = true;
+ LOG_INFO("Buffer manager exited");
+ }
public:
static BufferManager *GetInstance() {
@@ -61,6 +67,9 @@ class BufferManager {
return &instance;
}
SendBufferPtrT GetSendBuffer() {
+ if(exit_){
+ return nullptr;
+ }
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.empty()) {
return nullptr;
@@ -70,7 +79,7 @@ class BufferManager {
return buf;
}
void AddSendBuffer(const SendBufferPtrT &send_buffer) {
- if (nullptr == send_buffer) {
+ if (nullptr == send_buffer || exit_) {
return;
}
send_buffer->releaseBuf();
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
index 5dc013f2d5..58b9481e0f 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
@@ -24,6 +24,7 @@
#include "../config/sdk_conf.h"
#include "../metric/environment.h"
#include "../metric/metric.h"
+#include "../utils/logger.h"
#ifndef INLONG_METRIC_MANAGER_H
#define INLONG_METRIC_MANAGER_H
@@ -40,10 +41,6 @@ class MetricManager {
Environment environment_;
std::string coreParma_;
- MetricManager() {
-
- }
-
public:
static MetricManager *GetInstance() {
static MetricManager instance;
@@ -54,23 +51,35 @@ class MetricManager {
void PrintMetric();
void Run();
void UpdateMetric(const std::string &stat_key, Metric &stat) {
+ if(!running_){
+ return;
+ }
std::lock_guard<std::mutex> lck(mutex_);
stat_map_[stat_key].Update(stat);
}
void AddReceiveBufferFullCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ if(!running_){
+ return;
+ }
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddReceiveBufferFullCount(count);
}
void AddTooLongMsgCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ if (!running_) {
+ return;
+ }
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddTooLongMsgCount(count);
}
void AddMetadataFailCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ if (!running_) {
+ return;
+ }
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddMetadataFailCount(count);
@@ -87,6 +96,7 @@ class MetricManager {
if (update_thread_.joinable()) {
update_thread_.join();
}
+ LOG_INFO("Metric manager exited");
}
};
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
index 26aa387083..c304ead157 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/msg_manager.h
@@ -32,6 +32,7 @@ class MsgManager {
mutable std::mutex mutex_;
uint32_t queue_limit_;
bool enable_share_msg_;
+ bool exit_= false;
MsgManager() {
uint32_t data_capacity_ =
std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
uint32_t buffer_num = SdkConfig::getInstance()->recv_buf_size_ /
data_capacity_;
@@ -48,6 +49,11 @@ class MsgManager {
AddMsg(msg_ptr);
}
}
+ ~MsgManager(){
+ std::lock_guard<std::mutex> lck(mutex_);
+ exit_ = true;
+ LOG_INFO("Msg manager exited");
+ }
public:
static MsgManager *GetInstance() {
@@ -55,7 +61,7 @@ class MsgManager {
return &instance;
}
SdkMsgPtr GetMsg() {
- if (!enable_share_msg_) {
+ if (!enable_share_msg_ || exit_) {
return nullptr;
}
std::lock_guard<std::mutex> lck(mutex_);
@@ -67,7 +73,7 @@ class MsgManager {
return buf;
}
void AddMsg(const SdkMsgPtr &msg_ptr) {
- if (nullptr == msg_ptr || !enable_share_msg_) {
+ if (nullptr == msg_ptr || !enable_share_msg_ || exit_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
@@ -78,7 +84,7 @@ class MsgManager {
}
void AddMsg(const std::vector<SdkMsgPtr> &user_msg_vector) {
- if (!enable_share_msg_) {
+ if (!enable_share_msg_ || exit_) {
return;
}
std::lock_guard<std::mutex> lck(mutex_);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index 3db331aeda..c84435ea89 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -21,6 +21,7 @@
#include "api_code.h"
#include <fstream>
+#include <curl/curl.h>
#include "../config/ini_help.h"
#include "../utils/capi_constant.h"
@@ -41,11 +42,14 @@ ProxyManager::~ProxyManager() {
if (update_conf_thread_.joinable()) {
update_conf_thread_.join();
}
+
+ curl_global_cleanup();
}
void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+ curl_global_init(CURL_GLOBAL_ALL);
ReadLocalCache();
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
index b3856b440b..7fe5ba4dc5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
@@ -309,7 +309,6 @@ int32_t Utils::requestUrl(const std::string &url,
std::string &urlByDNS,
}
CURL *curl = NULL;
- curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if (!curl) {
@@ -330,9 +329,7 @@ int32_t Utils::requestUrl(const std::string &url,
std::string &urlByDNS,
LOG_INFO("request from tdm:" << res);
if (ret != 0) {
LOG_ERROR("failed to request data from " << urlByDNS);
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
@@ -341,25 +338,17 @@ int32_t Utils::requestUrl(const std::string &url,
std::string &urlByDNS,
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
LOG_ERROR("tdm responsed with code " << code);
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
-
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
if (res.empty()) {
LOG_ERROR("tdm return empty data");
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
-
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
+ if (curl) curl_easy_cleanup(curl);
return 0;
}
@@ -406,8 +395,6 @@ int32_t Utils::requestUrl(std::string &res, const
HttpRequest *request) {
CURL *curl = NULL;
struct curl_slist *list = NULL;
- curl_global_init(CURL_GLOBAL_ALL);
-
curl = curl_easy_init();
if (!curl) {
LOG_ERROR("failed to init curl object");
@@ -415,8 +402,7 @@ int32_t Utils::requestUrl(std::string &res, const
HttpRequest *request) {
}
// http header
- list = curl_slist_append(list,
- "Content-Type: application/x-www-form-urlencoded");
+ list = curl_slist_append(list,"Content-Type:
application/x-www-form-urlencoded");
if (request->need_auth && !request->auth_id.empty() &&
!request->auth_key.empty()) {
@@ -446,9 +432,7 @@ int32_t Utils::requestUrl(std::string &res, const
HttpRequest *request) {
if (ret != 0) {
LOG_ERROR(curl_easy_strerror(ret));
LOG_ERROR("failed to request data from " << request->url.c_str());
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
@@ -456,26 +440,20 @@ int32_t Utils::requestUrl(std::string &res, const
HttpRequest *request) {
int32_t code;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
- LOG_ERROR("tdm responsed with code " << code);
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
if (res.empty()) {
- LOG_ERROR("tdm return empty data");
- if (curl)
- curl_easy_cleanup(curl);
- curl_global_cleanup();
+ LOG_ERROR("Empty response");
+ if (curl) curl_easy_cleanup(curl);
return SdkCode::kErrorCURL;
}
- // clean work
+ // Clean work
curl_easy_cleanup(curl);
- curl_global_cleanup();
return 0;
}