This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab3f2cc416 [INLONG-9293][SDK] Optimize the problem that the more
inlong grouids there are, the more memory is consumed. (#9295)
ab3f2cc416 is described below
commit ab3f2cc4162dfe633d5dabf33f0f4c828c0d2e9d
Author: doleyzi <[email protected]>
AuthorDate: Thu Nov 16 09:40:20 2023 +0800
[INLONG-9293][SDK] Optimize the problem that the more inlong grouids there
are, the more memory is consumed. (#9295)
---
.../dataproxy-sdk-cpp/release/inc/api_code.h | 2 +-
.../dataproxy-sdk-cpp/release/inc/sdk_msg.h | 10 ++-
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 2 +-
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 93 ++++++++++++----------
.../dataproxy-sdk-cpp/src/group/recv_group.h | 13 +--
.../dataproxy-sdk-cpp/src/group/send_group.cc | 4 +-
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 2 +-
.../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 2 +-
.../dataproxy-sdk-cpp/src/manager/recv_manager.cc | 19 +++--
.../dataproxy-sdk-cpp/src/manager/recv_manager.h | 2 +-
.../dataproxy-sdk-cpp/src/manager/send_manager.cc | 10 +--
.../dataproxy-sdk-cpp/src/utils/send_buffer.h | 2 +-
12 files changed, 87 insertions(+), 74 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
index 227f6f9e24..0228f9c96d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/api_code.h
@@ -31,7 +31,7 @@ enum SdkCode {
kInvalidBid = 12,
kFailGetBufferPool = 13,
kFailGetSendBuf = 14,
- kFailWriteToBuf = 15,
+ kMsgEmpty = 15,
kErrorCURL = 16,
kErrorParseJson = 17,
kFailGetRevGroup = 18,
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
index ae77b7efcf..df7e90f4ac 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_msg.h
@@ -36,12 +36,18 @@ struct SdkMsg {
std::string user_client_ip_;
std::string data_pack_format_attr_;
+
+ std::string inlong_group_id_;
+ std::string inlong_stream_id_;
+
SdkMsg(const std::string &mmsg, const std::string &mclient_ip,
int64_t mreport_time, UserCallBack mcb, const std::string &attr,
- const std::string &u_ip, int64_t u_time)
+ const std::string &u_ip, int64_t u_time,const std::string&
inlong_group_id,const std::string& inlong_stream_id)
: msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time),
cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip),
- data_pack_format_attr_(attr) {}
+ data_pack_format_attr_(attr),
+ inlong_group_id_(inlong_group_id),
+ inlong_stream_id_(inlong_stream_id){}
};
using SdkMsgPtr = std::shared_ptr<SdkMsg>;
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index fd5b1b8312..f058142841 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -75,7 +75,7 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true);
auto recv_group =
- recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
+ recv_manager_->GetRecvGroup(inlong_group_id);
if (recv_group == nullptr) {
LOG_ERROR("fail to get recv group, inlong_group_id:"
<< inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 9d41942e97..79931bd434 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -26,25 +26,22 @@
namespace inlong {
const uint32_t DEFAULT_PACK_ATTR = 400;
-RecvGroup::RecvGroup(const std::string &inlong_group_id,
- const std::string &inlong_stream_id,
- std::shared_ptr<SendManager> send_manager)
- : cur_len_(0), inlong_group_id_(inlong_group_id),
- inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0),
+RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager>
send_manager)
+ : cur_len_(0), groupId_num_(0), streamId_num_(0),
msg_type_(SdkConfig::getInstance()->msg_type_),
data_capacity_(SdkConfig::getInstance()->buf_size_),
- send_manager_(send_manager) {
+ send_manager_(send_manager),group_key_(group_key) {
data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;
pack_buf_ = new char[data_capacity_];
memset(pack_buf_, 0x0, data_capacity_);
- topic_desc_ =
- "groupId=" + inlong_group_id_ + "&streamId=" + inlong_stream_id_;
data_time_ = 0;
last_pack_time_ = Utils::getCurrentMsTime();
max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;
+
+
LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_);
}
RecvGroup::~RecvGroup() {
@@ -64,7 +61,7 @@ int32_t RecvGroup::SendData(const std::string &msg, const
std::string &groupId,
return SdkCode::kRecvBufferFull;
}
- AddMsg(msg, client_ip, report_time, call_back);
+ AddMsg(msg, client_ip, report_time, call_back,groupId,streamId);
return SdkCode::kSuccess;
}
@@ -72,15 +69,15 @@ int32_t RecvGroup::SendData(const std::string &msg, const
std::string &groupId,
int32_t RecvGroup::DoDispatchMsg() {
last_pack_time_ = Utils::getCurrentMsTime();
std::lock_guard<std::mutex> lck(mutex_);
- if (inlong_group_id_.empty()) {
+ if (group_key_.empty()) {
LOG_ERROR("groupId is empty, check!!");
return SdkCode::kInvalidInput;
}
if (msgs_.empty()) {
LOG_ERROR("no msg in msg_set, check!");
- return SdkCode::kFailGetRevGroup;
+ return SdkCode::kMsgEmpty;
}
- auto send_group = send_manager_->GetSendGroup(inlong_group_id_);
+ auto send_group = send_manager_->GetSendGroup(group_key_);
if (send_group == nullptr) {
LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
return SdkCode::kFailGetSendBuf;
@@ -93,38 +90,54 @@ int32_t RecvGroup::DoDispatchMsg() {
}
uint32_t total_length = 0;
- std::vector<SdkMsgPtr> msgs_to_dispatch;
+ uint64_t max_tid_size = 0;
+ std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch;
+ std::unordered_map<std::string, uint64_t> tid_stat;
while (!msgs_.empty()) {
SdkMsgPtr msg = msgs_.front();
- if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
- SdkConfig::getInstance()->pack_size_) {
- break;
+ if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH >
SdkConfig::getInstance()->pack_size_) {
+ if (!msgs_to_dispatch.empty()) {
+ break;
+ }
}
- msgs_to_dispatch.push_back(msg);
+ std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_;
+ msgs_to_dispatch[msg_key].push_back(msg);
msgs_.pop();
+
total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
+
+ if (tid_stat.find(msg_key) == tid_stat.end()) {
+ tid_stat[msg_key] = 0;
+ }
+ tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() +
constants::ATTR_LENGTH;
+
+ max_tid_size = std::max(tid_stat[msg_key], max_tid_size);
}
cur_len_ = cur_len_ - total_length;
- std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msgs_to_dispatch);
+ for (auto it : msgs_to_dispatch) {
+ std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second);
- ResetPackBuf();
+ ResetPackBuf();
- if (send_buffer == nullptr) {
- CallbalkToUsr(msgs_to_dispatch);
- return SdkCode::kSuccess;
- }
+ if (send_buffer == nullptr) {
+ CallbalkToUsr(it.second);
+ continue;
+ }
- int ret = send_group->PushData(send_buffer);
- if (ret != SdkCode::kSuccess) {
- CallbalkToUsr(msgs_to_dispatch);
+ int ret = send_group->PushData(send_buffer);
+ if (ret != SdkCode::kSuccess) {
+ CallbalkToUsr(it.second);
+ }
}
+
return SdkCode::kSuccess;
}
void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
- int64_t report_time, UserCallBack call_back) {
+ int64_t report_time, UserCallBack call_back,const
std::string &groupId,
+ const std::string &streamId) {
if (Utils::isLegalTime(report_time))
data_time_ = report_time;
else {
@@ -142,19 +155,11 @@ void RecvGroup::AddMsg(const std::string &msg,
std::string client_ip,
"&__addcol2__ip=" + client_ip;
msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back,
data_pack_format_attr, user_client_ip,
- user_report_time));
+ user_report_time,groupId,streamId));
cur_len_ += msg.size() + constants::ATTR_LENGTH;
}
-bool RecvGroup::ShouldPack(int32_t msg_len) {
- if (0 == cur_len_ || msgs_.empty())
- return false;
- if (msg_len + cur_len_ > SdkConfig::getInstance()->pack_size_)
- return true;
- return false;
-}
-
bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
uint32_t &out_len, uint32_t uniq_id) {
if (pack_data == nullptr) {
@@ -220,7 +225,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
streamId_num_ == 0) {
groupId_num = 0;
streamId_num = 0;
- groupId_streamId_char = topic_desc_;
+ groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
+ "&streamId=" + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
@@ -240,7 +246,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
"&node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
- attr = topic_desc_;
+ attr = "groupId=" + msgs[0]->inlong_group_id_ +
+ "&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
bodyBegin += sizeof(uint16_t);
@@ -290,7 +297,9 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char
*pack_data,
// attr
std::string attr;
- attr = topic_desc_;
+ attr = "groupId=" + msgs[0]->inlong_group_id_ +
+ "&streamId=" + msgs[0]->inlong_stream_id_;
+
attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id);
if (isSnappy)
@@ -354,8 +363,8 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
}
send_buffer->setLen(len);
send_buffer->setMsgCnt(msg_cnt);
- send_buffer->setInlongGroupId(inlong_group_id_);
- send_buffer->setStreamId(inlong_stream_id_);
+ send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
+ send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
send_buffer->setUniqId(uniq_id);
send_buffer->setIsPacked(true);
for (auto it : msgs) {
@@ -368,7 +377,7 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) {
for (auto &it : msgs) {
if (it->cb_) {
- it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(),
+ it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
it->msg_.data(), it->msg_.size(), it->user_report_time_,
it->user_client_ip_.data());
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
index f9150f3602..2d1a9e6315 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h
@@ -38,11 +38,8 @@ private:
uint32_t cur_len_;
AtomicInt pack_err_;
uint64_t data_time_;
- std::string inlong_group_id_;
- std::string inlong_stream_id_;
uint16_t groupId_num_;
uint16_t streamId_num_;
- std::string topic_desc_;
uint32_t msg_type_;
mutable std::mutex mutex_;
@@ -50,17 +47,17 @@ private:
uint64_t last_pack_time_;
uint64_t max_recv_size_;
+ std::string group_key_;
- bool ShouldPack(int32_t msg_len);
int32_t DoDispatchMsg();
void AddMsg(const std::string &msg, std::string client_ip,
- int64_t report_time, UserCallBack call_back);
+ int64_t report_time, UserCallBack call_back,const std::string
&groupId,
+ const std::string &streamId);
bool IsZipAndOperate(std::string &res, uint32_t real_cur_len);
inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }
public:
- RecvGroup(const std::string &groupId, const std::string &streamId,
- std::shared_ptr<SendManager> send_manager);
+ RecvGroup(const std::string &group_key,std::shared_ptr<SendManager>
send_manager);
~RecvGroup();
int32_t SendData(const std::string &msg, const std::string &groupId,
@@ -72,8 +69,6 @@ public:
char *data() const { return pack_buf_; }
- std::string groupId() const { return inlong_group_id_; }
-
std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs);
void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs);
};
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
index 4591953fa0..f317163799 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -34,8 +34,8 @@ SendGroup::SendGroup(std::string send_group_key)
if (max_send_queue_num_ <= 0) {
max_send_queue_num_ = kDefaultQueueSize;
}
- LOG_INFO("SendGroup: " << send_group_key_
- << ", max send queue num: " << max_send_queue_num_);
+ LOG_INFO("SendGroup:" << send_group_key_
+ << ",max send queue num:" << max_send_queue_num_);
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
heart_beat_interval_ =
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index f2e887d29c..084905e2c9 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -295,7 +295,7 @@ int32_t ProxyManager::GetProxyByClusterId(const std::string
&cluster_id,
proxy_info_vec = it->second;
return SdkCode::kSuccess;
}
-std::string ProxyManager::GetSendGroupKey(const std::string &groupid) {
+std::string ProxyManager::GetGroupKey(const std::string &groupid) {
if (SdkConfig::getInstance()->enable_isolation_) {
return groupid;
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
index 18856f61ec..653185798c 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h
@@ -66,7 +66,7 @@ public:
int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec
&proxy_info_vec);
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
- std::string GetSendGroupKey(const std::string &groupid);
+ std::string GetGroupKey(const std::string &groupid);
bool HasProxy(const std::string &inlong_group_id);
bool CheckGroupid(const std::string &groupid);
bool CheckClusterId(const std::string &cluster_id);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
index 077dd4464b..789e98e7e6 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
@@ -17,6 +17,7 @@
#include "recv_manager.h"
#include "../utils/utils.h"
+#include "proxy_manager.h"
namespace inlong {
RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager)
@@ -24,8 +25,9 @@ RecvManager::RecvManager(std::shared_ptr<SendManager>
send_manager)
exit_flag_(false) {
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;
- max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
- SdkConfig::getInstance()->max_stream_id_num_;
+ max_groupid_streamid_num_ =
+ std::max(SdkConfig::getInstance()->max_group_id_num_,
+ SdkConfig::getInstance()->max_stream_id_num_);
LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);
check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
@@ -54,10 +56,13 @@ RecvManager::~RecvManager() {
}
}
void RecvManager::Run() { io_context_.run(); }
-RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId,
- const std::string &streamId) {
+RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId) {
std::lock_guard<std::mutex> lck(mutex_);
- auto it = recv_group_map_.find(groupId + streamId);
+ std::string group_key = ProxyManager::GetInstance()->GetGroupKey(groupId);
+ if (group_key.empty()) {
+ return nullptr;
+ }
+ auto it = recv_group_map_.find(group_key);
if (it != recv_group_map_.end()) {
return it->second;
} else {
@@ -66,8 +71,8 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string
&groupId,
}
RecvGroupPtr recv_group =
- std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
- recv_group_map_.emplace(groupId + streamId, recv_group);
+ std::make_shared<RecvGroup>(group_key, send_manager_);
+ recv_group_map_.emplace(group_key, recv_group);
return recv_group;
}
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
index 14de991ac4..4a69d2939a 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
@@ -52,7 +52,7 @@ public:
RecvManager(std::shared_ptr<SendManager> send_manager);
~RecvManager();
void DispatchData(std::error_code error);
- RecvGroupPtr GetRecvGroup(const std::string &bid, const std::string &tid);
+ RecvGroupPtr GetRecvGroup(const std::string &bid);
};
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
index 475fe14b3f..6104411b9a 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -28,18 +28,16 @@ SendManager::SendManager() : send_group_idx_(0) {
<< SdkConfig::getInstance()->inlong_group_ids_[i]
<< " send group num:"
<< SdkConfig::getInstance()->per_groupid_thread_nums_);
- std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey(
+ std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey(
SdkConfig::getInstance()->inlong_group_ids_[i]);
AddSendGroup(send_group_key);
}
}
-SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
- std::string send_group_key =
- ProxyManager::GetInstance()->GetSendGroupKey(group_id);
- SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key);
+SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) {
+ SendGroupPtr send_group_ptr = DoGetSendGroup(group_key);
if (send_group_ptr == nullptr) {
- AddSendGroup(send_group_key);
+ AddSendGroup(group_key);
}
return send_group_ptr;
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
index 66e93d67ce..1f2970b17d 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -87,7 +87,7 @@ public:
void doUserCallBack() {
for (auto it : user_msg_vector_) {
if (it->cb_) {
- it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(),
+ it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
it->msg_.data(), it->msg_.size(), it->user_report_time_,
it->user_client_ip_.data());
}