This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b61dc53c92 [INLONG-9058][SDK] Limit the number of inlong-groupid and
inlong-streamid (#9059)
b61dc53c92 is described below
commit b61dc53c9254fa79484e526b16c4e1e5ce1b0989
Author: doleyzi <[email protected]>
AuthorDate: Tue Oct 17 16:45:14 2023 +0800
[INLONG-9058][SDK] Limit the number of inlong-groupid and inlong-streamid
(#9059)
---
.../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 2 ++
.../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 20 ++++++++++++++++++++
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 8 +++++---
.../dataproxy-sdk-cpp/src/group/recv_group.cc | 10 +++++-----
.../dataproxy-sdk-cpp/src/manager/recv_manager.cc | 12 ++++++++++--
.../dataproxy-sdk-cpp/src/manager/recv_manager.h | 2 ++
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 3 +++
7 files changed, 47 insertions(+), 10 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index d9bf4b16c3..deb11f99ed 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -47,6 +47,8 @@ public:
inlong_group_ids_; // Initialize the inlong groupid collection
uint32_t recv_buf_size_; // Receive buf size, tid granularity
uint32_t send_buf_size_; // Send buf size, bid granularity
+ uint32_t max_group_id_num_; // Send buf size, bid granularity
+ uint32_t max_stream_id_num_; // Send buf size, bid granularity
// thread parameters
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index a4317dc764..340b6f1e66 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -83,6 +83,8 @@ void SdkConfig::defaultInit() {
send_buf_size_ = constants::kSendBufSize;
recv_buf_size_ = constants::kRecvBufSize;
max_msg_size_ = constants::kExtPackSize;
+ max_group_id_num_ = constants::kMaxGroupIdNum;
+ max_stream_id_num_=constants::kMaxStreamIdNum;
// Packaging parameters
enable_pack_ = constants::kEnablePack;
@@ -159,6 +161,22 @@ void SdkConfig::InitCacheParam(const rapidjson::Value
&doc) {
} else {
send_buf_size_ = constants::kSendBufSize;
}
+
+ if (doc.HasMember("max_group_id_num") && doc["max_group_id_num"].IsInt() &&
+ doc["max_group_id_num"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["max_group_id_num"];
+ max_group_id_num_ = obj.GetInt();
+ } else {
+ max_group_id_num_ = constants::kMaxGroupIdNum;
+ }
+
+ if (doc.HasMember("max_stream_id_num") && doc["max_stream_id_num"].IsInt() &&
+ doc["max_stream_id_num"].GetInt() > 0) {
+ const rapidjson::Value &obj = doc["max_stream_id_num"];
+ max_stream_id_num_ = obj.GetInt();
+ } else {
+ max_stream_id_num_ = constants::kMaxGroupIdNum;
+ }
}
void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
@@ -440,6 +458,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("need_auth: " << need_auth_ ? "true" : "false");
LOG_INFO("auth_id: " << auth_id_.c_str());
LOG_INFO("auth_key: " << auth_key_.c_str());
+ LOG_INFO("max_group_id_num: " << max_group_id_num_);
+ LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
}
} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 785f648226..fd5b1b8312 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -17,8 +17,10 @@
#include "api_imp.h"
#include "../manager/proxy_manager.h"
+#include "../utils/capi_constant.h"
#include "../utils/logger.h"
#include "../utils/utils.h"
+
#include "api_code.h"
#include <iostream>
#include <signal.h>
@@ -36,7 +38,7 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
return SdkCode::kErrorInit;
}
max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_,
- SdkConfig::getInstance()->pack_size_);
+ SdkConfig::getInstance()->pack_size_) -
constants::ATTR_LENGTH;
local_ip_ = SdkConfig::getInstance()->local_ip_;
return DoInit();
@@ -75,8 +77,8 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
auto recv_group =
recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
if (recv_group == nullptr) {
- LOG_ERROR("fail to get pack queue, inlong_group_id:%s, inlong_stream_id:%s"
- << inlong_group_id.c_str() << inlong_stream_id.c_str());
+ LOG_ERROR("fail to get recv group, inlong_group_id:"
+ << inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
return SdkCode::kFailGetRevGroup;
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 94b0f8e848..9d41942e97 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -18,13 +18,13 @@
#include "recv_group.h"
#include "../protocol/msg_protocol.h"
+#include "../utils/capi_constant.h"
#include "../utils/utils.h"
#include "api_code.h"
#include <cstdlib>
#include <functional>
namespace inlong {
-const uint32_t ATTR_LENGTH = 10;
const uint32_t DEFAULT_PACK_ATTR = 400;
RecvGroup::RecvGroup(const std::string &inlong_group_id,
const std::string &inlong_stream_id,
@@ -96,13 +96,13 @@ int32_t RecvGroup::DoDispatchMsg() {
std::vector<SdkMsgPtr> msgs_to_dispatch;
while (!msgs_.empty()) {
SdkMsgPtr msg = msgs_.front();
- if (msg->msg_.size() + total_length + ATTR_LENGTH >
+ if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
SdkConfig::getInstance()->pack_size_) {
break;
}
msgs_to_dispatch.push_back(msg);
msgs_.pop();
- total_length = msg->msg_.size() + total_length + ATTR_LENGTH;
+ total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
}
cur_len_ = cur_len_ - total_length;
@@ -144,7 +144,7 @@ void RecvGroup::AddMsg(const std::string &msg, std::string
client_ip,
data_pack_format_attr, user_client_ip,
user_report_time));
- cur_len_ += msg.size() + ATTR_LENGTH;
+ cur_len_ += msg.size() + constants::ATTR_LENGTH;
}
bool RecvGroup::ShouldPack(int32_t msg_len) {
@@ -322,7 +322,7 @@ bool RecvGroup::IsZipAndOperate(std::string &res, uint32_t
real_cur_len) {
}
void RecvGroup::DispatchMsg(bool exit) {
- if (cur_len_ <= ATTR_LENGTH || msgs_.empty())
+ if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty())
return;
bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_;
bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
index d5a663b426..077dd4464b 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
@@ -24,6 +24,10 @@ RecvManager::RecvManager(std::shared_ptr<SendManager>
send_manager)
exit_flag_(false) {
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;
+ max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
+ SdkConfig::getInstance()->max_stream_id_num_;
+ LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);
+
check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
check_timer_->expires_after(std::chrono::milliseconds(10));
check_timer_->async_wait(
@@ -54,9 +58,13 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string
&groupId,
const std::string &streamId) {
std::lock_guard<std::mutex> lck(mutex_);
auto it = recv_group_map_.find(groupId + streamId);
- if (it != recv_group_map_.end())
+ if (it != recv_group_map_.end()) {
return it->second;
- else {
+ } else {
+ if (recv_group_map_.size() > max_groupid_streamid_num_) {
+ return nullptr;
+ }
+
RecvGroupPtr recv_group =
std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
recv_group_map_.emplace(groupId + streamId, recv_group);
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
index ed1c0dc7cb..14de991ac4 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
@@ -44,6 +44,8 @@ private:
uint32_t dispatch_interval_;
+ uint64_t max_groupid_streamid_num_;
+
void Run();
public:
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 5e67dd9266..274c3161e2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -38,6 +38,8 @@ static const uint8_t kBinSnappyFlag = 1 << 5;
static const int32_t kPerGroupidThreadNums = 1;
static const int32_t kSendBufSize = 10240000;
static const int32_t kRecvBufSize = 10240000;
+static const uint32_t kMaxGroupIdNum = 50;
+static const uint32_t kMaxStreamIdNum = 100;
static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;
@@ -86,6 +88,7 @@ static const char kProtocolType[] = "TCP";
static const bool kNeedAuth = false;
static const uint32_t kMaxAttrLen = 2048;
+const uint32_t ATTR_LENGTH = 10;
} // namespace constants
} // namespace inlong