This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b61dc53c92 [INLONG-9058][SDK] Limit the number of inlong-groupid and 
inlong-streamid (#9059)
b61dc53c92 is described below

commit b61dc53c9254fa79484e526b16c4e1e5ce1b0989
Author: doleyzi <[email protected]>
AuthorDate: Tue Oct 17 16:45:14 2023 +0800

    [INLONG-9058][SDK] Limit the number of inlong-groupid and inlong-streamid 
(#9059)
---
 .../dataproxy-sdk-cpp/release/inc/sdk_conf.h         |  2 ++
 .../dataproxy-sdk-cpp/src/config/sdk_conf.cc         | 20 ++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/core/api_imp.cc            |  8 +++++---
 .../dataproxy-sdk-cpp/src/group/recv_group.cc        | 10 +++++-----
 .../dataproxy-sdk-cpp/src/manager/recv_manager.cc    | 12 ++++++++++--
 .../dataproxy-sdk-cpp/src/manager/recv_manager.h     |  2 ++
 .../dataproxy-sdk-cpp/src/utils/capi_constant.h      |  3 +++
 7 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
index d9bf4b16c3..deb11f99ed 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h
@@ -47,6 +47,8 @@ public:
       inlong_group_ids_;   // Initialize the inlong groupid collection
   uint32_t recv_buf_size_; // Receive buf size, tid granularity
   uint32_t send_buf_size_; // Send buf size, bid granularity
+  uint32_t max_group_id_num_; // Send buf size, bid granularity
+  uint32_t max_stream_id_num_; // Send buf size, bid granularity
 
   // thread parameters
   uint32_t per_groupid_thread_nums_; // Sending thread per groupid
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
index a4317dc764..340b6f1e66 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc
@@ -83,6 +83,8 @@ void SdkConfig::defaultInit() {
   send_buf_size_ = constants::kSendBufSize;
   recv_buf_size_ = constants::kRecvBufSize;
   max_msg_size_ = constants::kExtPackSize;
+  max_group_id_num_ = constants::kMaxGroupIdNum;
+  max_stream_id_num_=constants::kMaxStreamIdNum;
 
   // Packaging parameters
   enable_pack_ = constants::kEnablePack;
@@ -159,6 +161,22 @@ void SdkConfig::InitCacheParam(const rapidjson::Value 
&doc) {
   } else {
     send_buf_size_ = constants::kSendBufSize;
   }
+
+  if (doc.HasMember("max_group_id_num") && doc["max_group_id_num"].IsInt() &&
+      doc["max_group_id_num"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["max_group_id_num"];
+    max_group_id_num_ = obj.GetInt();
+  } else {
+    max_group_id_num_ = constants::kMaxGroupIdNum;
+  }
+
+  if (doc.HasMember("max_stream_id_num") && doc["max_stream_id_num"].IsInt() &&
+      doc["max_stream_id_num"].GetInt() > 0) {
+    const rapidjson::Value &obj = doc["max_stream_id_num"];
+    max_stream_id_num_ = obj.GetInt();
+  } else {
+    max_stream_id_num_ = constants::kMaxGroupIdNum;
+  }
 }
 
 void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
@@ -440,6 +458,8 @@ void SdkConfig::ShowClientConfig() {
   LOG_INFO("need_auth: " << need_auth_ ? "true" : "false");
   LOG_INFO("auth_id: " << auth_id_.c_str());
   LOG_INFO("auth_key: " << auth_key_.c_str());
+  LOG_INFO("max_group_id_num: " << max_group_id_num_);
+  LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
 }
 
 } // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index 785f648226..fd5b1b8312 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -17,8 +17,10 @@
 
 #include "api_imp.h"
 #include "../manager/proxy_manager.h"
+#include "../utils/capi_constant.h"
 #include "../utils/logger.h"
 #include "../utils/utils.h"
+
 #include "api_code.h"
 #include <iostream>
 #include <signal.h>
@@ -36,7 +38,7 @@ int32_t ApiImp::InitApi(const char *config_file_path) {
     return SdkCode::kErrorInit;
   }
   max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_,
-                             SdkConfig::getInstance()->pack_size_);
+                             SdkConfig::getInstance()->pack_size_) - 
constants::ATTR_LENGTH;
   local_ip_ = SdkConfig::getInstance()->local_ip_;
 
   return DoInit();
@@ -75,8 +77,8 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
   auto recv_group =
       recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
   if (recv_group == nullptr) {
-    LOG_ERROR("fail to get pack queue, inlong_group_id:%s, inlong_stream_id:%s"
-              << inlong_group_id.c_str() << inlong_stream_id.c_str());
+    LOG_ERROR("fail to get recv group, inlong_group_id:"
+              << inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
     return SdkCode::kFailGetRevGroup;
   }
 
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
index 94b0f8e848..9d41942e97 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc
@@ -18,13 +18,13 @@
 #include "recv_group.h"
 
 #include "../protocol/msg_protocol.h"
+#include "../utils/capi_constant.h"
 #include "../utils/utils.h"
 #include "api_code.h"
 #include <cstdlib>
 #include <functional>
 
 namespace inlong {
-const uint32_t ATTR_LENGTH = 10;
 const uint32_t DEFAULT_PACK_ATTR = 400;
 RecvGroup::RecvGroup(const std::string &inlong_group_id,
                      const std::string &inlong_stream_id,
@@ -96,13 +96,13 @@ int32_t RecvGroup::DoDispatchMsg() {
   std::vector<SdkMsgPtr> msgs_to_dispatch;
   while (!msgs_.empty()) {
     SdkMsgPtr msg = msgs_.front();
-    if (msg->msg_.size() + total_length + ATTR_LENGTH >
+    if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
         SdkConfig::getInstance()->pack_size_) {
       break;
     }
     msgs_to_dispatch.push_back(msg);
     msgs_.pop();
-    total_length = msg->msg_.size() + total_length + ATTR_LENGTH;
+    total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;
   }
 
   cur_len_ = cur_len_ - total_length;
@@ -144,7 +144,7 @@ void RecvGroup::AddMsg(const std::string &msg, std::string 
client_ip,
                                       data_pack_format_attr, user_client_ip,
                                       user_report_time));
 
-  cur_len_ += msg.size() + ATTR_LENGTH;
+  cur_len_ += msg.size() + constants::ATTR_LENGTH;
 }
 
 bool RecvGroup::ShouldPack(int32_t msg_len) {
@@ -322,7 +322,7 @@ bool RecvGroup::IsZipAndOperate(std::string &res, uint32_t 
real_cur_len) {
 }
 
 void RecvGroup::DispatchMsg(bool exit) {
-  if (cur_len_ <= ATTR_LENGTH || msgs_.empty())
+  if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty())
     return;
   bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_;
   bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) >
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
index d5a663b426..077dd4464b 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.cc
@@ -24,6 +24,10 @@ RecvManager::RecvManager(std::shared_ptr<SendManager> 
send_manager)
       exit_flag_(false) {
   dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;
 
+  max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
+                              SdkConfig::getInstance()->max_stream_id_num_;
+  LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);
+
   check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
   check_timer_->expires_after(std::chrono::milliseconds(10));
   check_timer_->async_wait(
@@ -54,9 +58,13 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string 
&groupId,
                                        const std::string &streamId) {
   std::lock_guard<std::mutex> lck(mutex_);
   auto it = recv_group_map_.find(groupId + streamId);
-  if (it != recv_group_map_.end())
+  if (it != recv_group_map_.end()) {
     return it->second;
-  else {
+  } else {
+    if (recv_group_map_.size() > max_groupid_streamid_num_) {
+      return nullptr;
+    }
+
     RecvGroupPtr recv_group =
         std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
     recv_group_map_.emplace(groupId + streamId, recv_group);
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
index ed1c0dc7cb..14de991ac4 100644
--- 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/recv_manager.h
@@ -44,6 +44,8 @@ private:
 
   uint32_t dispatch_interval_;
 
+  uint64_t max_groupid_streamid_num_;
+
   void Run();
 
 public:
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 5e67dd9266..274c3161e2 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -38,6 +38,8 @@ static const uint8_t kBinSnappyFlag = 1 << 5;
 static const int32_t kPerGroupidThreadNums = 1;
 static const int32_t kSendBufSize = 10240000;
 static const int32_t kRecvBufSize = 10240000;
+static const uint32_t kMaxGroupIdNum = 50;
+static const uint32_t kMaxStreamIdNum = 100;
 
 static const int32_t kDispatchIntervalZip = 8;
 static const int32_t kDispatchIntervalSend = 10;
@@ -86,6 +88,7 @@ static const char kProtocolType[] = "TCP";
 static const bool kNeedAuth = false;
 
 static const uint32_t kMaxAttrLen = 2048;
+const uint32_t ATTR_LENGTH = 10;
 
 } // namespace constants
 } // namespace inlong

Reply via email to