This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6e850ed756 [INLONG-8868][SDK] Optimize send data framework for
dataproxy cpp sdk (#8869)
6e850ed756 is described below
commit 6e850ed75631224dc1a0095e9b74fe3345e5abd0
Author: doleyzi <[email protected]>
AuthorDate: Sat Sep 9 18:06:53 2023 +0800
[INLONG-8868][SDK] Optimize send data framework for dataproxy cpp sdk
(#8869)
Co-authored-by: doleyzi <[email protected]>
---
.../dataproxy-sdk-cpp/src/group/send_group.cc | 249 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/group/send_group.h | 82 +++++++
.../dataproxy-sdk-cpp/src/manager/send_manager.cc | 84 +++++++
.../dataproxy-sdk-cpp/src/manager/send_manager.h | 47 ++++
4 files changed, 462 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
new file mode 100644
index 0000000000..319b2a9ad5
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "send_group.h"
+#include "api_code.h"
+#include "proxy_conf_manager.h"
+#include <algorithm>
+#include <random>
+
+namespace inlong {
+const int kDefaultQueueSize = 20;
+SendGroup::SendGroup(std::string group_id)
+ : work_(asio::make_work_guard(io_context_)), group_id_(group_id),
+ send_idx_(0) {
+ max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ /
+ SdkConfig::getInstance()->pack_size_;
+ if (max_send_queue_num_ <= 0) {
+ max_send_queue_num_ = kDefaultQueueSize;
+ }
+ LOG_INFO("SendGroup max_send_queue_num " << max_send_queue_num_);
+ dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
+ tcp_clients_old_ = nullptr;
+ tcp_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>();
+ tcp_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_);
+
+ send_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+ send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+ send_timer_->async_wait(
+ std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1));
+
+ update_conf_timer_ = std::make_shared<asio::steady_timer>(io_context_);
+ update_conf_timer_->expires_after(std::chrono::milliseconds(1));
+ update_conf_timer_->async_wait(
+ std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+
+ current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_);
+ thread_ = std::thread(&SendGroup::Run, this);
+}
+SendGroup::~SendGroup() {
+ LOG_INFO("~SendGroup ");
+ send_timer_->cancel();
+ update_conf_timer_->cancel();
+ io_context_.stop();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+}
+void SendGroup::Run() { io_context_.run(); }
+void SendGroup::PreDispatchData(std::error_code error) {
+ if (error) {
+ return;
+ }
+ send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+ send_timer_->async_wait(
+ std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+}
+
+void SendGroup::DispatchData(std::error_code error) {
+ if (error) {
+ return;
+ }
+ try {
+ unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
+ if (tcp_clients_ != nullptr) {
+ if (send_idx_ >= tcp_clients_->size()) {
+ send_idx_ = 0;
+ }
+
+ while (send_idx_ < tcp_clients_->size()) {
+ if ((*tcp_clients_)[send_idx_]->isFree()) {
+ SendBufferPtrT send_buf = PopData();
+ if (send_buf == nullptr) {
+ break;
+ }
+ (*tcp_clients_)[send_idx_]->write(send_buf);
+ }
+ send_idx_++;
+ }
+ }
+ } catch (std::exception &e) {
+ LOG_ERROR("Exception " << e.what());
+ }
+ send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_));
+ send_timer_->async_wait(
+ std::bind(&SendGroup::DispatchData, this, std::placeholders::_1));
+}
+
+bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; }
+
+uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) {
+ if (IsFull()) {
+ return SdkCode::kSendBufferFull;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ send_buf_list_.push(send_buffer_ptr);
+ return SdkCode::kSuccess;
+}
+
+void SendGroup::UpdateConf(std::error_code error) {
+ if (error) {
+ return;
+ }
+ LOG_INFO("start UpdateConf.");
+
+ ClearOldTcpClients();
+
+ ProxyInfoVec new_proxy_info;
+ if (proxyConfManager::GetInstance()->GetproxyByBid(
+ group_id_, new_proxy_info) != kSuccess ||
+ new_proxy_info.empty()) {
+ update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+ update_conf_timer_->async_wait(
+ std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ return;
+ }
+
+ if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) {
+ LOG_INFO("Don`t need UpdateConf. current proxy size("
+ << current_proxy_vec_.size() << ")=proxy size("
+ << new_proxy_info.size() << ")");
+ update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+ update_conf_timer_->async_wait(
+ std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ return;
+ }
+
+ uint32_t proxy_num = SdkConfig::getInstance()->max_proxy_num_;
+ if (proxy_num > new_proxy_info.size()) {
+ proxy_num = new_proxy_info.size();
+ }
+
+ std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp =
+ std::make_shared<std::vector<TcpClientTPtrT>>();
+ if (tcp_clients_tmp == nullptr) {
+ LOG_INFO("tcp_clients_tmp is nullptr");
+ update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+ update_conf_timer_->async_wait(
+ std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+ return;
+ }
+
+ tcp_clients_tmp->reserve(proxy_num);
+ for (int i = 0; i < proxy_num; i++) {
+ ProxyInfo proxy_tmp = new_proxy_info[i];
+ TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(
+ io_context_, proxy_tmp.ip(), proxy_tmp.port());
+ tcp_clients_tmp->push_back(tcpClientTPtrT);
+ LOG_INFO("new proxy info.[" << proxy_tmp.ip() << ":" << proxy_tmp.port()
+ << "]");
+ }
+
+ {
+ LOG_INFO("do change tcp clients.");
+ unique_write_lock<read_write_mutex> wtlck(remote_proxy_list_mutex_);
+ tcp_clients_old_ = tcp_clients_;
+ tcp_clients_ = tcp_clients_tmp;
+ }
+
+ if (tcp_clients_old_ != nullptr) {
+ for (int j = 0; j < tcp_clients_old_->size(); j++) {
+ (*tcp_clients_old_)[j]->DoClose();
+ }
+ }
+
+ current_proxy_vec_ = new_proxy_info;
+
+ update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute));
+ update_conf_timer_->async_wait(
+ std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1));
+
+ LOG_INFO("Finished UpdateConf.");
+}
+
+SendBufferPtrT SendGroup::PopData() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (send_buf_list_.empty()) {
+ return nullptr;
+ }
+ SendBufferPtrT send_buf = send_buf_list_.front();
+ send_buf_list_.pop();
+ return send_buf;
+}
+
+uint32_t SendGroup::GetQueueSize() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return send_buf_list_.size();
+}
+
+bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec,
+ ProxyInfoVec &new_proxy_vec) {
+ if (new_proxy_vec.empty())
+ return false;
+ if (current_proxy_vec.size() != new_proxy_vec.size()) {
+ return true;
+ }
+
+ for (auto ¤t_bu : current_proxy_vec) {
+ for (int i = 0; i < new_proxy_vec.size(); i++) {
+ if ((current_bu.ip() == new_proxy_vec[i].ip()) &&
+ (current_bu.port() == new_proxy_vec[i].port()))
+ break;
+ if (i == (new_proxy_vec.size() - 1)) {
+ if ((current_bu.ip() != new_proxy_vec[i].ip() ||
+ current_bu.port() == new_proxy_vec[i].port())) {
+ LOG_INFO("current proxy ip." << current_bu.ip() << ":"
+ << current_bu.port()
+ << " can`t find in proxy.");
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool SendGroup::IsAvailable() {
+ unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_);
+ if (tcp_clients_ == nullptr) {
+ return false;
+ }
+ if (tcp_clients_->empty()) {
+ return false;
+ }
+ return true;
+}
+void SendGroup::ClearOldTcpClients() {
+ if (tcp_clients_old_ != nullptr) {
+ LOG_INFO("ClearOldTcpClients." << tcp_clients_old_->size());
+ tcp_clients_old_->clear();
+ tcp_clients_old_.reset();
+ }
+}
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
new file mode 100644
index 0000000000..9b929ca4d8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_SEND_GROUP_H
+#define INLONG_SDK_SEND_GROUP_H
+
+#include "../config/proxy_info.h"
+#include "../utils/send_buffer.h"
+#include "tcp_client.h"
+#include <queue>
+namespace inlong {
+const int kTimerMiSeconds = 10;
+const int kTimerMinute = 60000;
+
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+using IOContext = asio::io_context;
+using io_context_work =
+ asio::executor_work_guard<asio::io_context::executor_type>;
+
+class SendGroup : noncopyable {
+private:
+ IOContext io_context_;
+ io_context_work work_;
+ std::thread thread_;
+ void Run();
+
+public:
+ std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_;
+ std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_old_;
+
+ ProxyInfoVec current_proxy_vec_;
+ TcpClientTPtrVecItT current_client_;
+ std::queue<SendBufferPtrT> send_buf_list_;
+
+ SteadyTimerPtr send_timer_;
+ SteadyTimerPtr update_conf_timer_;
+
+ read_write_mutex send_group_mutex_;
+ read_write_mutex remote_proxy_list_mutex_;
+ std::mutex mutex_;
+ std::string group_id_;
+
+ std::uint32_t send_idx_;
+ uint32_t max_send_queue_num_;
+
+ SendGroup(std::string group_id);
+ ~SendGroup();
+
+ void PreDispatchData(std::error_code error);
+ void DispatchData(std::error_code error);
+ bool IsFull();
+ uint32_t PushData(SendBufferPtrT send_buffer_ptr);
+ SendBufferPtrT PopData();
+ uint32_t GetQueueSize();
+ void UpdateConf(std::error_code error);
+ bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec,
+ ProxyInfoVec &new_proxy_vec);
+ bool IsAvailable();
+ uint32_t dispatch_interval_;
+
+ void ClearOldTcpClients();
+};
+using SendGroupPtr = std::shared_ptr<SendGroup>;
+} // namespace inlong
+
+#endif // INLONG_SDK_SEND_GROUP_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
new file mode 100644
index 0000000000..c5c484a2ee
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "send_manager.h"
+#include "../utils/utils.h"
+#include "bus_conf_manager.h"
+namespace inlong {
+SendManager::SendManager() : send_group_idx_(0) {
+ for (int32_t i = 0; i < SdkConfig::getInstance()->group_ids_.size(); i++) {
+ LOG_INFO("SendManager, group_id:"
+ << SdkConfig::getInstance()->group_ids_[i] << " send group num:"
+ << SdkConfig::getInstance()->per_group_id_thread_nums_);
+ DoAddSendGroup(SdkConfig::getInstance()->group_ids_[i]);
+ }
+}
+
+SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
+ SendGroupPtr send_group_ptr = DoGetSendGroup(group_id);
+ if (send_group_ptr == nullptr) {
+ AddSendGroup(group_id);
+ }
+ return send_group_ptr;
+}
+
+bool SendManager::AddSendGroup(const std::string &group_id) {
+ if (!BusConfManager::GetInstance()->IsBusExist(group_id)) {
+ LOG_ERROR("bus is not exist." << group_id);
+ return false;
+ }
+ DoAddSendGroup(group_id);
+ return false;
+}
+
+void SendManager::DoAddSendGroup(const std::string &group_id) {
+ unique_write_lock<read_write_mutex>
wtlck(group_id_2_send_group_map_rwmutex_);
+ auto send_group_map = group_id_2_send_group_map_.find(group_id);
+ if (send_group_map != group_id_2_send_group_map_.end()) {
+ LOG_WARN("send group has exist." << group_id);
+ return;
+ }
+ std::vector<SendGroupPtr> send_group;
+ send_group.reserve(SdkConfig::getInstance()->per_group_id_thread_nums_);
+ for (int32_t j = 0; j < SdkConfig::getInstance()->per_group_id_thread_nums_;
+ j++) {
+ send_group.push_back(std::make_shared<SendGroup>(group_id));
+ }
+ group_id_2_send_group_map_[group_id] = send_group;
+}
+
+SendGroupPtr SendManager::DoGetSendGroup(const std::string &group_id) {
+ unique_read_lock<read_write_mutex> rdlck(group_id_2_send_group_map_rwmutex_);
+ auto send_group_map = group_id_2_send_group_map_.find(group_id);
+ if (send_group_map == group_id_2_send_group_map_.end()) {
+ LOG_ERROR("fail to get send group, group_id:%s" << group_id);
+ return nullptr;
+ }
+ if (send_group_map->second.empty()) {
+ return nullptr;
+ }
+ auto send_group_vec = send_group_map->second;
+ send_group_idx_++;
+ if (send_group_idx_ >= send_group_vec.size()) {
+ send_group_idx_ = 0;
+ }
+ return send_group_vec[send_group_idx_];
+}
+
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
new file mode 100644
index 0000000000..d6c42044f4
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_SEND_MANAGER_H
+#define INLONG_SDK_SEND_MANAGER_H
+
+#include "../group/send_group.h"
+#include "../utils/read_write_mutex.h"
+#include <unordered_map>
+
+namespace inlong {
+using namespace inlong;
+
+class SendManager : noncopyable {
+private:
+ read_write_mutex group_id_2_send_group_map_rwmutex_;
+ std::unordered_map<std::string, std::vector<SendGroupPtr>>
+ group_id_2_send_group_map_;
+ SendGroupPtr DoGetSendGroup(const std::string &group_id);
+ void DoAddSendGroup(const std::string &group_id);
+ volatile uint32_t send_group_idx_;
+
+public:
+ SendManager();
+ virtual ~SendManager(){};
+ SendGroupPtr GetSendGroup(const std::string &group_id);
+ bool AddSendGroup(const std::string &group_id);
+};
+} // namespace inlong
+
+#endif // INLONG_SDK_SEND_MANAGER_H