This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e850ed756 [INLONG-8868][SDK] Optimize send data framework for 
dataproxy cpp sdk (#8869)
6e850ed756 is described below

commit 6e850ed75631224dc1a0095e9b74fe3345e5abd0
Author: doleyzi <[email protected]>
AuthorDate: Sat Sep 9 18:06:53 2023 +0800

    [INLONG-8868][SDK] Optimize send data framework for dataproxy cpp sdk 
(#8869)
    
    Co-authored-by: doleyzi <[email protected]>
---
 .../dataproxy-sdk-cpp/src/group/send_group.cc      | 249 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/group/send_group.h       |  82 +++++++
 .../dataproxy-sdk-cpp/src/manager/send_manager.cc  |  84 +++++++
 .../dataproxy-sdk-cpp/src/manager/send_manager.h   |  47 ++++
 4 files changed, 462 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
new file mode 100644
index 0000000000..319b2a9ad5
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "send_group.h"
+#include "api_code.h"
+#include "proxy_conf_manager.h"
+#include <algorithm>
+#include <random>
+
+namespace inlong {
+const int kDefaultQueueSize = 20;
+SendGroup::SendGroup(std::string group_id)
+    : work_(asio::make_work_guard(io_context_)), group_id_(group_id),
+      send_idx_(0) {
+  max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
+                        SdkConfig::getInstance()->pack_size_;
+  if (max_send_queue_num_ <= 0) {
+    max_send_queue_num_ = kDefaultQueueSize;
+  }
+  LOG_INFO("SendGroup  max_send_queue_num " << max_send_queue_num_);
+  dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
+  tcp_clients_old_ = nullptr;
+  tcp_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
+  tcp_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
+
+  send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+  send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+  send_timer_->async_wait(
+      std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1));
+
+  update_conf_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+  update_conf_timer_->expires_after(std::chrono::milliseconds(1));
+  update_conf_timer_->async_wait(
+      std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+
+  current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+  thread_ = std::thread(&SendGroup::Run, this);
+}
+SendGroup::~SendGroup() {
+  LOG_INFO("~SendGroup ");
+  send_timer_->cancel();
+  update_conf_timer_->cancel();
+  io_context_.stop();
+  if (thread_.joinable()) {
+    thread_.join();
+  }
+}
+void SendGroup::Run() { io_context_.run(); }
+void SendGroup::PreDispatchData(std::error_code error) {
+  if (error) {
+    return;
+  }
+  send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+  send_timer_->async_wait(
+      std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+}
+
+void SendGroup::DispatchData(std::error_code error) {
+  if (error) {
+    return;
+  }
+  try {
+    unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
+    if (tcp_clients_ != nullptr) {
+      if (send_idx_ >= tcp_clients_->size()) {
+        send_idx_ = 0;
+      }
+
+      while (send_idx_ < tcp_clients_->size()) {
+        if ((*tcp_clients_)[send_idx_]->isFree()) {
+          SendBufferPtrT send_buf = PopData();
+          if (send_buf == nullptr) {
+            break;
+          }
+          (*tcp_clients_)[send_idx_]->write(send_buf);
+        }
+        send_idx_++;
+      }
+    }
+  } catch (std::exception &e) {
+    LOG_ERROR("Exception " << e.what());
+  }
+  send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+  send_timer_->async_wait(
+      std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+}
+
+bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
+
+uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) {
+  if (IsFull()) {
+    return SdkCode::kSendBufferFull;
+  }
+  std::lock_guard<std::mutex> lock(mutex_);
+  send_buf_list_.push(send_buffer_ptr);
+  return SdkCode::kSuccess;
+}
+
+void SendGroup::UpdateConf(std::error_code error) {
+  if (error) {
+    return;
+  }
+  LOG_INFO("start UpdateConf.");
+
+  ClearOldTcpClients();
+
+  ProxyInfoVec new_proxy_info;
+  if (proxyConfManager::GetInstance()->GetproxyByBid(
+          group_id_, new_proxy_info) != kSuccess ||
+      new_proxy_info.empty()) {
+    update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+    update_conf_timer_->async_wait(
+        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    return;
+  }
+
+  if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
+    LOG_INFO("Don`t need UpdateConf. current proxy size("
+             << current_proxy_vec_.size() << ")=proxy size("
+             << new_proxy_info.size() << ")");
+    update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+    update_conf_timer_->async_wait(
+        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    return;
+  }
+
+  uint32_t proxy_num = SdkConfig::getInstance()->max_proxy_num_;
+  if (proxy_num > new_proxy_info.size()) {
+    proxy_num = new_proxy_info.size();
+  }
+
+  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
+      std::make_shared<std::vector<TcpClientTPtrT>>();
+  if (tcp_clients_tmp == nullptr) {
+    LOG_INFO("tcp_clients_tmp is  nullptr");
+    update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+    update_conf_timer_->async_wait(
+        std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+    return;
+  }
+
+  tcp_clients_tmp->reserve(proxy_num);
+  for (int i = 0; i < proxy_num; i++) {
+    ProxyInfo proxy_tmp = new_proxy_info[i];
+    TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(
+        io_context_, proxy_tmp.ip(), proxy_tmp.port());
+    tcp_clients_tmp->push_back(tcpClientTPtrT);
+    LOG_INFO("new proxy info.[" << proxy_tmp.ip() << ":" << proxy_tmp.port()
+                                << "]");
+  }
+
+  {
+    LOG_INFO("do change tcp clients.");
+    unique_write_lock<read_write_mutex> wtlck(remote_proxy_list_mutex_);
+    tcp_clients_old_ = tcp_clients_;
+    tcp_clients_ = tcp_clients_tmp;
+  }
+
+  if (tcp_clients_old_ != nullptr) {
+    for (int j = 0; j < tcp_clients_old_->size(); j++) {
+      (*tcp_clients_old_)[j]->DoClose();
+    }
+  }
+
+  current_proxy_vec_ = new_proxy_info;
+
+  update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+  update_conf_timer_->async_wait(
+      std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+
+  LOG_INFO("Finished UpdateConf.");
+}
+
+SendBufferPtrT SendGroup::PopData() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (send_buf_list_.empty()) {
+    return nullptr;
+  }
+  SendBufferPtrT send_buf = send_buf_list_.front();
+  send_buf_list_.pop();
+  return send_buf;
+}
+
+uint32_t SendGroup::GetQueueSize() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return send_buf_list_.size();
+}
+
+bool SendGroup::IsConfChanged(ProxyInfoVec &current_proxy_vec,
+                              ProxyInfoVec &new_proxy_vec) {
+  if (new_proxy_vec.empty())
+    return false;
+  if (current_proxy_vec.size() != new_proxy_vec.size()) {
+    return true;
+  }
+
+  for (auto &current_bu : current_proxy_vec) {
+    for (int i = 0; i < new_proxy_vec.size(); i++) {
+      if ((current_bu.ip() == new_proxy_vec[i].ip()) &&
+          (current_bu.port() == new_proxy_vec[i].port()))
+        break;
+      if (i == (new_proxy_vec.size() - 1)) {
+        if ((current_bu.ip() != new_proxy_vec[i].ip() ||
+             current_bu.port() == new_proxy_vec[i].port())) {
+          LOG_INFO("current proxy ip." << current_bu.ip() << ":"
+                                       << current_bu.port()
+                                       << " can`t find in proxy.");
+          return true;
+        }
+      }
+    }
+  }
+  return false;
+}
+
+bool SendGroup::IsAvailable() {
+  unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
+  if (tcp_clients_ == nullptr) {
+    return false;
+  }
+  if (tcp_clients_->empty()) {
+    return false;
+  }
+  return true;
+}
+void SendGroup::ClearOldTcpClients() {
+  if (tcp_clients_old_ != nullptr) {
+    LOG_INFO("ClearOldTcpClients." << tcp_clients_old_->size());
+    tcp_clients_old_->clear();
+    tcp_clients_old_.reset();
+  }
+}
+} // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
new file mode 100644
index 0000000000..9b929ca4d8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_SEND_GROUP_H
+#define INLONG_SDK_SEND_GROUP_H
+
+#include "../config/proxy_info.h"
+#include "../utils/send_buffer.h"
+#include "tcp_client.h"
+#include <queue>
+namespace inlong {
+const int kTimerMiSeconds = 10;
+const int kTimerMinute = 60000;
+
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+using IOContext = asio::io_context;
+using io_context_work =
+    asio::executor_work_guard<asio::io_context::executor_type>;
+
+class SendGroup : noncopyable {
+private:
+  IOContext io_context_;
+  io_context_work work_;
+  std::thread thread_;
+  void Run();
+
+public:
+  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_;
+  std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_old_;
+
+  ProxyInfoVec current_proxy_vec_;
+  TcpClientTPtrVecItT current_client_;
+  std::queue<SendBufferPtrT> send_buf_list_;
+
+  SteadyTimerPtr send_timer_;
+  SteadyTimerPtr update_conf_timer_;
+
+  read_write_mutex send_group_mutex_;
+  read_write_mutex remote_proxy_list_mutex_;
+  std::mutex mutex_;
+  std::string group_id_;
+
+  std::uint32_t send_idx_;
+  uint32_t max_send_queue_num_;
+
+  SendGroup(std::string group_id);
+  ~SendGroup();
+
+  void PreDispatchData(std::error_code error);
+  void DispatchData(std::error_code error);
+  bool IsFull();
+  uint32_t PushData(SendBufferPtrT send_buffer_ptr);
+  SendBufferPtrT PopData();
+  uint32_t GetQueueSize();
+  void UpdateConf(std::error_code error);
+  bool IsConfChanged(ProxyInfoVec &current_proxy_vec,
+                     ProxyInfoVec &new_proxy_vec);
+  bool IsAvailable();
+  uint32_t dispatch_interval_;
+
+  void ClearOldTcpClients();
+};
+using SendGroupPtr = std::shared_ptr<SendGroup>;
+} // namespace inlong
+
+#endif // INLONG_SDK_SEND_GROUP_H
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
new file mode 100644
index 0000000000..c5c484a2ee
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "send_manager.h"
+#include "../utils/utils.h"
+#include "bus_conf_manager.h"
+namespace inlong {
+SendManager::SendManager() : send_group_idx_(0) {
+  for (int32_t i = 0; i < SdkConfig::getInstance()->group_ids_.size(); i++) {
+    LOG_INFO("SendManager, group_id:"
+             << SdkConfig::getInstance()->group_ids_[i] << " send group num:"
+             << SdkConfig::getInstance()->per_group_id_thread_nums_);
+    DoAddSendGroup(SdkConfig::getInstance()->group_ids_[i]);
+  }
+}
+
+SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
+  SendGroupPtr send_group_ptr = DoGetSendGroup(group_id);
+  if (send_group_ptr == nullptr) {
+    AddSendGroup(group_id);
+  }
+  return send_group_ptr;
+}
+
+bool SendManager::AddSendGroup(const std::string &group_id) {
+  if (!BusConfManager::GetInstance()->IsBusExist(group_id)) {
+    LOG_ERROR("bus is not exist." << group_id);
+    return false;
+  }
+  DoAddSendGroup(group_id);
+  return false;
+}
+
+void SendManager::DoAddSendGroup(const std::string &group_id) {
+  unique_write_lock<read_write_mutex> 
wtlck(group_id_2_send_group_map_rwmutex_);
+  auto send_group_map = group_id_2_send_group_map_.find(group_id);
+  if (send_group_map != group_id_2_send_group_map_.end()) {
+    LOG_WARN("send group has exist." << group_id);
+    return;
+  }
+  std::vector<SendGroupPtr> send_group;
+  send_group.reserve(SdkConfig::getInstance()->per_group_id_thread_nums_);
+  for (int32_t j = 0; j < SdkConfig::getInstance()->per_group_id_thread_nums_;
+       j++) {
+    send_group.push_back(std::make_shared<SendGroup>(group_id));
+  }
+  group_id_2_send_group_map_[group_id] = send_group;
+}
+
+SendGroupPtr SendManager::DoGetSendGroup(const std::string &group_id) {
+  unique_read_lock<read_write_mutex> rdlck(group_id_2_send_group_map_rwmutex_);
+  auto send_group_map = group_id_2_send_group_map_.find(group_id);
+  if (send_group_map == group_id_2_send_group_map_.end()) {
+    LOG_ERROR("fail to get send group, group_id:%s" << group_id);
+    return nullptr;
+  }
+  if (send_group_map->second.empty()) {
+    return nullptr;
+  }
+  auto send_group_vec = send_group_map->second;
+  send_group_idx_++;
+  if (send_group_idx_ >= send_group_vec.size()) {
+    send_group_idx_ = 0;
+  }
+  return send_group_vec[send_group_idx_];
+}
+
+} // namespace inlong
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
new file mode 100644
index 0000000000..d6c42044f4
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_SEND_MANAGER_H
+#define INLONG_SDK_SEND_MANAGER_H
+
+#include "../group/send_group.h"
+#include "../utils/read_write_mutex.h"
+#include <unordered_map>
+
+namespace inlong {
+using namespace inlong;
+
+class SendManager : noncopyable {
+private:
+  read_write_mutex group_id_2_send_group_map_rwmutex_;
+  std::unordered_map<std::string, std::vector<SendGroupPtr>>
+      group_id_2_send_group_map_;
+  SendGroupPtr DoGetSendGroup(const std::string &group_id);
+  void DoAddSendGroup(const std::string &group_id);
+  volatile uint32_t send_group_idx_;
+
+public:
+  SendManager();
+  virtual ~SendManager(){};
+  SendGroupPtr GetSendGroup(const std::string &group_id);
+  bool AddSendGroup(const std::string &group_id);
+};
+} // namespace inlong
+
+#endif // INLONG_SDK_SEND_MANAGER_H

Reply via email to