This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d318f8374 [INLONG-5253][SDK] DataProxy SDK(cpp) buffer manager (#5255)
d318f8374 is described below
commit d318f83741475f54ebd9f6381eccfdc010eeca02
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 17:46:06 2022 +0800
[INLONG-5253][SDK] DataProxy SDK(cpp) buffer manager (#5255)
---
.../dataproxy-sdk-cpp/src/base/pack_queue.cc | 580 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/base/pack_queue.h | 134 +++++
.../dataproxy-sdk-cpp/src/net/buffer_pool.cc | 428 +++++++++++++++
.../dataproxy-sdk-cpp/src/net/buffer_pool.h | 120 +++++
.../dataproxy-sdk-cpp/src/net/send_buffer.h | 155 ++++++
5 files changed, 1417 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
new file mode 100644
index 000000000..93bd542e8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "pack_queue.h"
+
+#include <cstdlib>
+#include <functional>
+
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "sdk_core.h"
+#include "logger.h"
+#include "msg_protocol.h"
+#include "send_buffer.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+#include "utils.h"
+#include "msg_protocol.h"
+
+namespace dataproxy_sdk
+{
+ PackQueue::PackQueue(const std::string &inlong_group_id, const std::string
&inlong_stream_id)
+ : cur_len_(0), inlong_group_id_(inlong_group_id),
inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0),
msg_type_(g_config->msg_type_), data_capacity_(g_config->buf_size_)
+ {
+ data_ = new char[data_capacity_];
+ memset(data_, 0x0, data_capacity_);
+ topic_desc_ = "inlong_group_id=" + inlong_group_id_ +
"&inlong_stream_id=" + inlong_stream_id_;
+ first_use_ = Utils::getCurrentMsTime();
+ last_use_ = Utils::getCurrentMsTime();
+ data_time_ = 0;
+ }
+
+ PackQueue::~PackQueue()
+ {
+ if (data_)
+ {
+ delete[] data_;
+ data_ = nullptr;
+ }
+ }
+
+ int32_t PackQueue::sendMsg(const std::string &msg,
+ const std::string &inlong_group_id,
+ const std::string &inlong_stream_id,
+ const std::string &client_ip,
+ uint64_t report_time,
+ UserCallBack call_back)
+ {
+ std::lock_guard<std::mutex> lck(mutex_);
+
+ //pack previous
+ if (isTriggerPack(report_time, msg.size()))
+ {
+ int32_t res = writeToBuf();
+ if (res)
+ {
+ increasePackErr();
+ return res;
+ }
+ }
+
+ //write data to packqueue
+ int32_t append_ret = appendMsg(msg, client_ip, report_time, call_back);
+ if (append_ret)
+ {
+ LOG_ERROR("fail to write to pack queue, inlong_group_id: %s,
inlong_stream_id: %s", inlong_group_id.c_str(), inlong_stream_id.c_str());
+ return append_ret;
+ }
+
+ //if uneable_pack, single msg is written to packqueue and directly
sent to buf
+ if (!g_config->enable_pack_)
+ {
+ int32_t res = writeToBuf();
+ if (res)
+ {
+ increasePackErr();
+ return res;
+ }
+ }
+ return 0;
+ }
+
+ //send pack data to buf
+ int32_t PackQueue::writeToBuf()
+ {
+ if (inlong_group_id_.empty())
+ {
+ LOG_ERROR("something is wrong, check!!");
+ return SDKInvalidResult::kFailGetConn;
+ }
+ if (msg_set_.empty())
+ {
+ LOG_ERROR("no msg in msg_set, check!");
+ return SDKInvalidResult::kFailGetPackQueue;
+ }
+ auto conn = g_clusters->getSendConn(inlong_group_id_);
+ if (!conn)
+ {
+ LOG_ERROR("no avaliable connection for inlong_group_id: %s, try
later", inlong_group_id_.c_str());
+ return SDKInvalidResult::kFailGetConn;
+ }
+ auto pool = g_pools->getPool(inlong_group_id_);
+ if (!pool)
+ {
+ return SDKInvalidResult::kFailGetBufferPool;
+ }
+
+ SendBuffer *send_buf = nullptr;
+
+ int32_t res = pool->getSendBuf(send_buf);
+ if (res)
+ {
+ return res;
+ }
+ if (!send_buf)
+ {
+ LOG_ERROR("failed to get send_buf, something gets wrong,
checkout!");
+ return SDKInvalidResult::kFailGetSendBuf;
+ }
+
+ //lock sendbuf and write pack data to sendbuf
+ {
+ std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
+
+ uint32_t len = 0;
+ int32_t msg_cnt = msg_set_.size();
+ // std::string msg_bid = inlong_group_id_;
+ uint32_t uniq_id = g_send_msgid.incrementAndGet();
+ if (!packOperate(send_buf->content(), len, uniq_id) || len == 0)
+ {
+ LOG_ERROR("failed to write data to send buf from pack queue,
pool id:%d, buf id:%d", pool->poolId(), pool->writeId());
+ return SDKInvalidResult::kFailWriteToBuf;
+ }
+ send_buf->setLen(len);
+ send_buf->setMsgCnt(msg_cnt);
+ send_buf->setBid(inlong_group_id_);
+ send_buf->setTid(inlong_stream_id_);
+ send_buf->setUniqId(uniq_id);
+ send_buf->setTarget(conn);
+ send_buf->setIsPacked(true);
+
+ for (auto it : msg_set_)
+ {
+ send_buf->addUserMsg(it);
+ }
+ }
+
+ pack_num_.increment();
+ g_pools->addUid2BufPool(send_buf->uniqId(),pool);//used for ack finding
+
+ resetPackQueue();
+
+ pool->sendBufToConn(send_buf);
+ return 0;
+ }
+
+ int32_t PackQueue::appendMsg(const std::string &msg, std::string
client_ip, int64_t report_time, UserCallBack call_back)
+ {
+ //too long msg
+ if (msg.size() > g_config->ext_pack_size_)
+ {
+ LOG_ERROR("msg len (%d) more than ext_pack_size (%d)", msg.size(),
g_config->ext_pack_size_);
+ return SDKInvalidResult::kMsgTooLong;
+ }
+
+ //if datatime is illegal, fix it using current time
+ if (Utils::isLegalTime(report_time))
+ data_time_ = report_time;
+ else
+ {
+ data_time_ = Utils::getCurrentMsTime();
+ pack_redotime_cnt_.increment();
+ }
+
+ //used for callback
+ if (call_back)
+ {
+ std::string user_client_ip = client_ip;
+ int64_t user_report_time = report_time;
+ if (client_ip.empty())
+ {
+ client_ip = "127.0.0.1";
+ }
+ std::string data_pack_format_attr = "__addcol1__reptime=" +
Utils::getFormatTime(data_time_) + "&__addcol2__ip=" + client_ip;
+ msg_set_.emplace_back(std::make_shared<UserMsg>(msg, client_ip,
data_time_, call_back, data_pack_format_attr, user_client_ip,
user_report_time));
+ }
+
+ cur_len_ += msg.size() + 1; // '\n' using one byte
+
+ if (g_config->isNormalDataPackFormat())
+ {
+ cur_len_ += 4;
+ }
+ if (g_config->isAttrDataPackFormat())
+ {
+ cur_len_ += constants::kAttrLen + 8;
+ }
+
+ //update last using time
+ last_use_ = Utils::getCurrentMsTime();
+
+ return 0;
+ }
+
+ /**
+ * @description: whether trigger pack data
+ * @param {uint64_t} report_time
+ * @param {int32_t} msg_len
+ */
+ bool PackQueue::isTriggerPack(uint64_t report_time, int32_t msg_len)
+ {
+ if (0 == cur_len_ || msg_set_.empty())
+ return false;
+
+ if (!Utils::isLegalTime(report_time))
+ {
+ report_time = Utils::getCurrentMsTime();
+ }
+
+ int64_t max_pack_time_interval = 1800000; // FIXME: use user config?
+ bool time_trigger = false; //timeout trigger
+ bool len_trigger = false; //content trigger
+ if (llabs(report_time - data_time_) > max_pack_time_interval ||
+ (report_time != data_time_ && report_time / 1000 / 3600 !=
data_time_ / 1000 / 3600))
+ {
+ time_trigger = true;
+ }
+ if (msg_len + cur_len_ > g_config->pack_size_)
+ {
+ len_trigger = true;
+ }
+
+ return (time_trigger || len_trigger);
+ }
+
+ /**
+ * @description: do pack operate
+ * @param {int8*} pack_data: packed binary data
+ * @param {uint32_t&} out_len
+ * @return {*} true if pack successfully
+ */
+ bool PackQueue::packOperate(char *pack_data, uint32_t &out_len, uint32_t
uniq_id)
+ {
+ if (!pack_data)
+ {
+ LOG_ERROR("nullptr, failed to allocate memory for buf");
+ return false;
+ }
+ //add body into data_, then zip and copy to buffer
+ uint32_t idx = 0;
+ for (auto &it : msg_set_)
+ {
+ //msg>=5,body format: data_len|data
+
+ if (msg_type_ >= 5) //add data_len
+ {
+ *(uint32_t *)(&data_[idx]) = htonl(it->msg.size());
+ idx += sizeof(uint32_t);
+ }
+ //add data
+ memcpy(&data_[idx], it->msg.data(), it->msg.size());
+ idx += static_cast<uint32_t>(it->msg.size());
+
+ //add attrlen|attr
+ if (g_config->isAttrDataPackFormat())
+ {
+ *(uint32_t *)(&data_[idx]) =
htonl(it->data_pack_format_attr.size());
+ idx += sizeof(uint32_t);
+
+ memcpy(&data_[idx], it->data_pack_format_attr.data(),
it->data_pack_format_attr.size());
+ idx += static_cast<uint32_t>(it->data_pack_format_attr.size());
+ }
+
+ // msgtype = 2/3 support '\n'
+ if (msg_type_ == 2 || msg_type_ == 3)
+ {
+ data_[idx] = '\n';
+ ++idx;
+ }
+ }
+
+ //preprocess attr
+ uint32_t cnt = 1;
+ if (msg_set_.size())
+ {
+ cnt = msg_set_.size();
+ }
+
+ //pack
+ if (msg_type_ >= constants::kBinPackMethod)
+ {
+ char *bodyBegin = pack_data + sizeof(BinaryMsgHead) +
sizeof(uint32_t); //head+body_len
+ uint32_t body_len = 0;
+
+ std::string snappy_res;
+ bool isSnappy = isZipAndOperate(snappy_res, idx);
+ char real_msg_type;
+
+ if (isSnappy) // need zip
+ {
+ body_len = static_cast<uint32_t>(snappy_res.size());
+ memcpy(bodyBegin, snappy_res.data(), body_len); //copy data to
buf
+ // msg_type
+ real_msg_type = (msg_type_ | constants::kBinSnappyFlag);
+ }
+ else
+ {
+ body_len = idx;
+ memcpy(bodyBegin, data_, body_len);
+ real_msg_type = msg_type_;
+ }
+ *(uint32_t *)(&(pack_data[sizeof(BinaryMsgHead)])) =
htonl(body_len); //set bodylen
+
+ bodyBegin += body_len;
+
+ // bid_num、tid_num、ext_field、data_time、cnt、uniq
+ uint32_t char_bid_flag = 0;
+ std::string bid_tid_char;
+ uint16_t bid_num = 0, tid_num = 0;
+ if (g_config->enableCharBid() || groupId_num_ == 0 ||
streamId_num_ == 0) //using string groupid and streamid
+ {
+ bid_num = 0;
+ tid_num = 0;
+ bid_tid_char = topic_desc_;
+ char_bid_flag = 0x4;
+ }
+ else
+ {
+ bid_num = groupId_num_;
+ tid_num = streamId_num_;
+ }
+ uint16_t ext_field = (g_config->extend_field_ | char_bid_flag);
+ uint32_t data_time = data_time_ / 1000;
+
+ // attr
+ std::string attr;
+ if (g_config->enableTraceIP())
+ {
+ if (bid_tid_char.empty())
+ attr = "node1ip=" + g_config->ser_ip_ + "&rtime1=" +
std::to_string(Utils::getCurrentMsTime());
+ else
+ attr = bid_tid_char + "&node1ip=" + g_config->ser_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+ }
+ else
+ {
+ attr = topic_desc_;
+ }
+ // attrlen
+ *(uint16_t *)bodyBegin = htons(attr.size());
+ bodyBegin += sizeof(uint16_t);
+ // attr
+ memcpy(bodyBegin, attr.data(), attr.size());
+ bodyBegin += attr.size();
+
+ // magic
+ *(uint16_t *)bodyBegin = htons(constants::kBinaryMagic);
+
+ uint32_t total_len = 25 + body_len + attr.size();
+
+ // header
+ char *p = pack_data;
+ *(uint32_t *)p = htonl(total_len);
+ p += 4;
+ *p = real_msg_type;
+ ++p;
+ *(uint16_t *)p = htons(bid_num);
+ p += 2;
+ *(uint16_t *)p = htons(tid_num);
+ p += 2;
+ *(uint16_t *)p = htons(ext_field);
+ p += 2;
+ *(uint32_t *)p = htonl(data_time);
+ p += 4;
+ *(uint16_t *)p = htons(cnt);
+ p += 2;
+ *(uint32_t *)p = htonl(uniq_id);
+
+ out_len = total_len + 4;
+ }
+ else
+ {
+ if (msg_type_ == 3 || msg_type_ == 2)
+ {
+ --idx;
+ }
+
+ // body whether needs zip
+ char *bodyBegin = pack_data + sizeof(ProtocolMsgHead) +
sizeof(uint32_t);
+ uint32_t body_len = 0;
+ std::string snappy_res;
+ bool isSnappy = isZipAndOperate(snappy_res, idx);
+ if (isSnappy)
+ {
+ body_len = static_cast<uint32_t>(snappy_res.size());
+ memcpy(bodyBegin, snappy_res.data(), body_len); //copy
+ }
+ else
+ {
+ body_len = idx;
+ memcpy(bodyBegin, data_, body_len);
+ }
+ *(uint32_t *)(&(pack_data[sizeof(ProtocolMsgHead)])) =
htonl(body_len); //set bodylen
+ bodyBegin += body_len;
+
+ // attr
+ std::string attr;
+ attr = topic_desc_;
+ attr += "&dt=" + std::to_string(data_time_);
+ attr += "&mid=" + std::to_string(uniq_id);
+ if (isSnappy)
+ attr += "&cp=snappy";
+ attr += "&cnt=" + std::to_string(cnt);
+ attr += "&sid=" + std::string(Utils::getSnowflakeId());
+ if (g_config->is_from_DC_)
+ { //&__addcol1_reptime=yyyymmddHHMMSS&__addcol2__ip=BBB&f=dc
+ attr += "&__addcol1_reptime=" +
Utils::getFormatTime(Utils::getCurrentMsTime()) + "&__addcol2__ip=" +
g_config->ser_ip_ + "&f=dc";
+ }
+
+ // attrlen
+ *(uint32_t *)bodyBegin = htonl(attr.size());
+ bodyBegin += sizeof(uint32_t);
+ // attr
+ memcpy(bodyBegin, attr.data(), attr.size());
+
+ // total_len
+ uint32_t total_len = 1 + 4 + body_len + 4 + attr.size();
+ *(uint32_t *)pack_data = htonl(total_len);
+ // msg_type
+ *(&pack_data[4]) = msg_type_;
+
+ LOG_TRACE("after packoperate: total_len:%d, body_len:%d,
attr_len:%d", total_len, body_len, attr.size());
+
+ out_len = total_len + 4;
+ }
+ return true;
+ }
+
+ /**
+ * @description: whether body data should be zipped; if needs, zip data
and save as res
+ * @param {string&} res
+ * @param {uint32_t} real_cur_len: data len before zip
+ * @return {*} true if needing zip
+ */
+ bool PackQueue::isZipAndOperate(std::string &res, uint32_t real_cur_len)
+ {
+ if (g_config->enable_zip_ && real_cur_len > g_config->min_zip_len_)
+ {
+ LOG_TRACE("start snappy.");
+ Utils::zipData(data_, real_cur_len, res);
+ return true;
+ }
+ else
+ return false;
+ }
+
+ void PackQueue::checkQueue(bool isLastPack)
+ {
+ if (cur_len_ == 0 || msg_set_.empty())
+ return;
+ //no timeout, and it isn't last packing
+ if (Utils::getCurrentMsTime() - first_use_ < g_config->pack_timeout_
&& !isLastPack)// FIXME:should use first_use instead of last_use?
+ return;
+ LOG_TRACE("start auto pack, inlong_group_id:%s, inlong_stream_id:%s",
inlong_group_id_.c_str(), inlong_stream_id_.c_str());
+
+ std::lock_guard<std::mutex> lck(mutex_);
+ last_use_ = Utils::getCurrentMsTime();
+ //send fail, callback and reset
+ if (writeToBuf())
+ {
+ for (auto& it : msg_set_)
+ {
+ if (it->cb) { it->cb(inlong_group_id_.data(),
inlong_stream_id_.data(), it->msg.data(), it->msg.size(), it->user_report_time,
it->user_client_ip.data()); }
+
+ }
+ resetPackQueue();
+
+ }
+ }
+
+ GlobalQueues::~GlobalQueues()
+ {
+ closeCheckSubroutine();
+ if (worker_.joinable())
+ {
+ worker_.join();
+ }
+ for (auto it : queues_)
+ {
+ it.second->checkQueue(true);
+ }
+ queues_.clear();
+ }
+
+ void GlobalQueues::startCheckSubroutine() { worker_ =
std::thread(&GlobalQueues::checkPackQueueSubroutine, this); }
+
+ void GlobalQueues::checkPackQueueSubroutine()
+ {
+ LOG_INFO("start checkPackQueue subroutine");
+ while (!exit_flag_)
+ {
+ Utils::taskWaitTime(2); // FIXME:improve pack interval
+ for (auto it : queues_)
+ {
+ it.second->checkQueue(exit_flag_);
+ }
+ }
+ LOG_WARN("exit checkPackQueue subroutine");
+ }
+
+ //get pack queue, create if not exists
+ PackQueuePtr GlobalQueues::getPackQueue(const std::string
&inlong_group_id, const std::string &inlong_stream_id)
+ {
+ std::lock_guard<std::mutex> lck(mutex_);
+
+ auto it = queues_.find(inlong_group_id + inlong_stream_id);
+ if (it != queues_.end())
+ return it->second;
+ else
+ {
+ PackQueuePtr p = std::make_shared<PackQueue>(inlong_group_id,
inlong_stream_id);
+ queues_.emplace(inlong_group_id + inlong_stream_id, p);
+ return p;
+ }
+ }
+ void GlobalQueues::printAck()
+ {
+ if (queues_.empty())
+ return;
+ for (auto &it : queues_)
+ {
+ LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#success send
msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+ it.second->success_num_.getAndSet(0));
+ }
+ }
+
+ void GlobalQueues::printTotalAck()
+ {
+ if (queues_.empty())
+ return;
+ for (auto &it : queues_)
+ {
+ LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#total success
msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+ it.second->total_success_num_.get());
+ }
+ }
+
+ void GlobalQueues::showState()
+ {
+ if (1 == user_exit_flag.get())
+ return;
+ uint32_t total_pack = 0;
+ for (auto &it : queues_)
+ {
+ uint32_t pack = it.second->pack_num_.getAndSet(0);
+ total_pack += pack;
+ LOG_DEBUG("------->toipc:%s, pack_num:%d",
it.second->topicDesc().c_str(), pack);
+ }
+ LOG_DEBUG("------->total_pack:%d", total_pack);
+
+ g_pools->showState();
+ g_executors->showState();
+ }
+
+} // namespace dataproxy_sdk
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h
new file mode 100644
index 000000000..dccf09b97
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_PACK_QUEUE_H_
+#define DATAPROXY_SDK_BASE_PACK_QUEUE_H_
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include "atomic.h"
+#include "buffer_pool.h"
+#include "sdk_core.h"
+#include "client_config.h"
+#include "noncopyable.h"
+#include "user_msg.h"
+namespace dataproxy_sdk
+{
+// groupid:packqueue=1:1
+class PackQueue
+{
+ private:
+ char* data_;
+ std::vector<UserMsgPtr> msg_set_;
+ uint32_t data_capacity_; // pack_size+extend_pack_size
+ uint32_t cur_len_; //data+additional attr
+ AtomicInt pack_redotime_cnt_; //msg_cnt, report time is illegal
+ AtomicInt pack_err_; //pack err metrics
+ uint64_t first_use_;
+ uint64_t last_use_;
+ uint64_t data_time_; //data time
+ std::string inlong_group_id_;
+ std::string inlong_stream_id_;
+ uint16_t groupId_num_;
+ uint16_t streamId_num_;
+ std::string topic_desc_; // inlong_group_id=xxx&inlong_stream_id=xxx
+ uint32_t msg_type_;
+ mutable std::mutex mutex_;
+
+ public:
+ AtomicUInt success_num_; //ack msg num in every one min
+ AtomicUInt total_success_num_; //total ack msg num
+
+ AtomicUInt pack_num_; //package num in every one min
+
+ public:
+ PackQueue(const std::string& inlong_group_id, const std::string&
inlong_stream_id);
+ ~PackQueue();
+
+ int32_t sendMsg(const std::string& msg,
+ const std::string& inlong_group_id,
+ const std::string& inlong_stream_id,
+ const std::string& client_ip,
+ uint64_t report_time,
+ UserCallBack cb);
+ bool packOperate(char* pack_data, uint32_t& out_len, uint32_t uniq_id);
+ void checkQueue(bool isLastPack); //if it's last pack, pack and send the
rest data
+ uint32_t curLen() const { return cur_len_; }
+ void setCurLen(const uint32_t& cur_len) { cur_len_ = cur_len; }
+ uint64_t firstUse() const { return first_use_; }
+ void setFirstUse(const uint64_t& first_use) { first_use_ = first_use; }
+ uint64_t lastUse() const { return last_use_; }
+ void setLastUse(const uint64_t& last_use) { last_use_ = last_use; }
+ uint64_t dataTime() const { return data_time_; }
+ void setDataTime(const uint64_t& data_time) { data_time_ = data_time; }
+ char* data() const { return data_; }
+
+ std::string inlong_group_id() const { return inlong_group_id_; }
+ void setBid(const std::string& inlong_group_id) { inlong_group_id_ =
inlong_group_id; }
+
+ void increasePackErr() { pack_err_.increment(); }
+
+ std::string topicDesc() const { return topic_desc_; }
+
+ private:
+ bool isTriggerPack(uint64_t msg_time, int32_t msg_len);
+ int32_t writeToBuf();
+ int32_t appendMsg(const std::string& msg, std::string client_ip, int64_t
report_time, UserCallBack call_back);
+ bool isZipAndOperate(std::string& res, uint32_t real_cur_len);
+ inline void resetPackQueue()
+ {
+ memset(data_, 0x0, data_capacity_);
+ cur_len_ = 0;
+ msg_set_.clear();
+ }
+};
+
+using PackQueuePtr = std::shared_ptr<PackQueue>;
+
+// all packqueues
+class GlobalQueues : noncopyable
+{
+ private:
+ std::unordered_map<std::string, PackQueuePtr> queues_;
+ mutable std::mutex mutex_;
+ std::thread worker_; //pack thread
+ bool exit_flag_;
+
+ public:
+ GlobalQueues() : exit_flag_(false){};
+ virtual ~GlobalQueues();
+ void startCheckSubroutine();
+ virtual PackQueuePtr getPackQueue(const std::string& inlong_group_id,
const std::string& inlong_stream_id);
+ void closeCheckSubroutine() { exit_flag_ = true; }
+ void printAck();
+ void printTotalAck();
+
+ void showState();
+
+ private:
+ void checkPackQueueSubroutine();
+};
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_PACK_QUEUE_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
new file mode 100644
index 000000000..6604b625d
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "buffer_pool.h"
+
+#include <chrono>
+#include <memory>
+#include <vector>
+
+#include "proxylist_config.h"
+#include "sdk_core.h"
+#include "logger.h"
+#include "send_buffer.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+
+namespace dataproxy_sdk
+{
+ BufferPool::BufferPool(uint32_t pool_id, uint32_t buf_num, uint32_t
buf_size)
+ : pool_id_(pool_id), buf_num_(buf_num), read_(0), write_(0),
current_use_(0), pool_mutex_(),
executor_(std::make_shared<ExecutorThread>(pool_id))
+ {
+ buffers_.reserve(buf_num_);
+ for (int i = 0; i < buf_num_; i++)
+ {
+ buffers_.push_back(new SendBuffer(buf_size));
+ }
+ }
+
+ BufferPool::~BufferPool()
+ {
+ for (auto it : buffers_)
+ {
+ delete it;
+ }
+ buffers_.clear();
+ LOG_DEBUG("free send buffer memory, pool(id:%d)", pool_id_);
+ }
+
+ bool BufferPool::isAvaliable()
+ {
+ return current_use_ < buf_num_;
+ }
+
+ int32_t BufferPool::getSendBuf(SendBuffer *&send_buf)
+ {
+ std::lock_guard<std::mutex> lck(pool_mutex_);
+
+ if (current_use_ >= buf_num_)
+ {
+ LOG_ERROR("buffer pool(id:%d) is full", pool_id_);
+ return SDKInvalidResult::kBufferPoolFull;
+ }
+
+ while (buffers_[write_]->isUsed())
+ {
+ write_ = (write_ + 1) % buf_num_;
+ }
+
+ send_buf = buffers_[write_];
+ // ++write_;
+ if (!send_buf)
+ {
+ LOG_ERROR("failed to get send buf, pool(id:%d)", pool_id_);
+ return SDKInvalidResult::kFailGetSendBuf;
+ }
+
+ send_buf->setIsUsed(true);
+ ++current_use_;
+
+ return 0;
+ }
+
+ int32_t BufferPool::sendBufToConn(SendBuffer *&send_buf)
+ {
+ std::lock_guard<std::mutex> pool_lck(pool_mutex_);
+
+ std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
+
+ auto self = shared_from_this();
+ if (g_config->retry_num_ > 0 && g_config->retry_interval_ > 0)
//resend if need
+ {
+ executor_->postTask([self, this, send_buf]
+ {
+ send_buf->timeout_timer_ = executor_->createSteadyTimer();
+
send_buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+
send_buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler,
shared_from_this(), std::placeholders::_1, send_buf)); });
+ }
+ if (send_buf->target()->isStop())
+ {
+ auto new_conn =
g_clusters->createActiveConn(send_buf->inlong_group_id(), pool_id_);
+ if (!new_conn)
+ {
+ LOG_WARN("first send buf, fail to create new conn for sendbuf,
inlong_group_id:%s, inlong_stream_id:%s", send_buf->inlong_group_id().c_str(),
send_buf->inlong_stream_id().c_str());
+ send_buf->fail_create_conn_.increment();
+ // send_buf->increaseRetryNum();
+ // send_err_.increment();
+ already_send_buf_list_[send_buf->uniqId()] = send_buf;
//preparing for next sending
+ return -1;
+ }
+ send_buf->setTarget(new_conn);
+ }
+ send_buf->target()->sendBuf(send_buf); //do send operation
+ send_buf->increaseRetryNum();
+ LOG_TRACE("request buf(id:%d) send to connection%s first time,
inlong_group_id:%s, inlong_stream_id:%s", send_buf->uniqId(),
send_buf->target()->getRemoteInfo().c_str(),
+ send_buf->inlong_group_id().c_str(),
send_buf->inlong_stream_id().c_str());
+
+ already_send_buf_list_[send_buf->uniqId()] = send_buf; //add buf uid
into sent list
+
+ send_total_.increment();
+
+ //update metrics
+ has_send_buf_.increment();
+ waiting_ack_.increment();
+
+ LOG_TRACE("success to write buf pool, pool(id:%d), buf(uid:%d),
inlong_group_id:%s, inlong_stream_id:%s", pool_id_, send_buf->uniqId(),
send_buf->inlong_group_id().c_str(),
+ send_buf->inlong_stream_id().c_str());
+ return 0;
+ }
+
+ void BufferPool::RetryHandler(const std::error_code &ec, SendBuffer *buf)
+ {
+ if (ec) // timer is cancelled, two cases:
1.ackbuf->sendbuf.reset;2.msg_type=2,conn.doWrite->cancel
+ {
+ if (g_config->msg_type_ == 2)
+ {
+ LOG_TRACE("msg_type is 2, no need ackmsg, clear buf(uid:%d)
directly", buf->uniqId());
+ ackBuf(buf->uniqId());
+ }
+ return;
+ }
+
+ if (!buf->isUsed())
+ {
+ return;
+ } // buf is already acked before retry
+
+ std::lock_guard<std::mutex> buf_lck(buf->mutex_);
+
+ if (buf->getAlreadySend() == 2)
+ {
+ LOG_INFO("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s)
ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(),
buf->inlong_group_id().c_str(),
+ buf->inlong_stream_id().c_str(), buf->getAlreadySend(),
g_config->retry_num_);
+ }
+ else
+ {
+ LOG_DEBUG("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s)
ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(),
buf->inlong_group_id().c_str(),
+ buf->inlong_stream_id().c_str(), buf->getAlreadySend(),
g_config->retry_num_);
+ }
+
+ // max_retry_num, usercallback
+ if (buf->getAlreadySend() >= g_config->retry_num_ ||
buf->fail_create_conn_.get() >= constants::kMaxRetryConnection)
+ {
+ LOG_WARN("fail to send buf(id:%d, inlong_group_id:%s,
inlong_stream_id:%s), has send max_retry_num(%d) times, start usercallback",
buf->uniqId(), buf->inlong_group_id().c_str(),
+ buf->inlong_stream_id().c_str(), g_config->retry_num_);
+ buf->doUserCallBack();
+ buf->reset();
+ }
+ else // ack timeout, resend
+ {
+ if (!buf->target() || buf->target()->isStop())
+ {
+ auto new_conn =
g_clusters->createActiveConn(buf->inlong_group_id(), pool_id_); // TODO: should
improve as choosing from active conn instread of creating?
+
+ if (!new_conn) //create conn error, waiting for next creating
+ {
+ LOG_INFO("fail to create new conn to send buf,
inlong_group_id:%s, inlong_stream_id:%s", buf->inlong_group_id().c_str(),
buf->inlong_stream_id().c_str());
+ buf->fail_create_conn_.increment();
+
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler,
shared_from_this(), std::placeholders::_1, buf));
+ return;
+ }
+ buf->setTarget(new_conn);
+ }
+
+ buf->target()->sendBuf(buf);
+ buf->increaseRetryNum();
+
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler,
shared_from_this(), std::placeholders::_1, buf));
+ }
+ }
+
+ void BufferPool::ackBufHelper(uint32_t uniq_id)
+ {
+ std::lock_guard<std::mutex> lck(pool_mutex_);
+ if (already_send_buf_list_.find(uniq_id) ==
already_send_buf_list_.end())
+ {
+ LOG_ERROR("no buf(uid:%d) in already_send_buf_list", uniq_id);
+ return;
+ }
+ auto &buf2ack = already_send_buf_list_[uniq_id];
+
+ std::lock_guard<std::mutex> buf_lck(buf2ack->mutex_);
+
+ int32_t msg_cnt = buf2ack->msgCnt();
+ success_ack_.add(msg_cnt);
+
+ //update packqueue metrics
+ if(g_queues){
+ auto packqueue =
g_queues->getPackQueue(buf2ack->inlong_group_id(), buf2ack->inlong_stream_id());
+ packqueue->success_num_.add(msg_cnt);
+ packqueue->total_success_num_.add(msg_cnt);
+ }
+
+ already_send_buf_list_[uniq_id]->reset();
+ already_send_buf_list_.erase(uniq_id);
+ --current_use_;
+
+ waiting_ack_.decrement();
+ has_ack_.increment();
+ LOG_TRACE("pool(id:%d) success ack msg cumulative num: %d", pool_id_,
success_ack_.get());
+ }
+
+ void BufferPool::showState(const std::string &inlong_group_id)
+ {
+ if (inlong_group_id.empty())
+ {
+ LOG_INFO("STATE|pool_id:%d, current_use:%d(total:%d),
invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", pool_id_, currentUse(),
+ g_config->bufNum(), has_send_buf_.get(), has_ack_.get(),
waiting_ack_.get());
+ }
+ else
+ {
+ LOG_INFO("STATE|inlong_group_id:%s, pool_id:%d,
current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d",
inlong_group_id.c_str(), pool_id_, currentUse(),
+ g_config->bufNum(), has_send_buf_.get(), has_ack_.get(),
waiting_ack_.get());
+ }
+ }
+
+ void BufferPool::close()
+ {
+ if (executor_ != nullptr)
+ {
+ executor_->close();
+ }
+ executor_.reset();
+ }
+
+ TotalPools::TotalPools() : next_(0), mutex_()
+ {
+ if (g_config->enable_groupId_isolation_) // different groupid data use
different bufpool
+ {
+ for (int32_t i = 0; i < g_config->inlong_group_ids_.size(); i++)
// create a bufpool for ervery groupidS
+ {
+ std::vector<BufferPoolPtr> bid_pool;
+ bid_pool.reserve(g_config->buffer_num_per_groupId_);
+ for (int32_t j = 0; j < g_config->buffer_num_per_groupId_; j++)
+ {
+ bid_pool.push_back(std::make_shared<BufferPool>(j,
g_config->bufNum(), g_config->buf_size_));
+ }
+
+ bid2pool_map_[g_config->inlong_group_ids_[i]] = bid_pool;
+ bid2next_[g_config->inlong_group_ids_[i]] = 0;
+ }
+ }
+ else // round-robin
+ {
+ pools_.reserve(g_config->shared_buf_nums_);
+ for (int i = 0; i < g_config->shared_buf_nums_; i++)
+ {
+ pools_.push_back(std::make_shared<BufferPool>(i,
g_config->bufNum(), g_config->buf_size_));
+ }
+ }
+ }
+
+ BufferPoolPtr TotalPools::getPool(const std::string &inlong_group_id)
+ {
+ if (g_config->enable_groupId_isolation_) // bid隔离
+ {
+ auto bid_pool = bid2pool_map_.find(inlong_group_id);
+ if (bid_pool == bid2pool_map_.end() || bid_pool->second.empty())
+ {
+ LOG_ERROR("fail to get bufferpool, inlong_group_id:%s",
inlong_group_id.c_str());
+ return nullptr;
+ }
+ if (bid2next_.find(inlong_group_id)==bid2next_.end())
+ {
+ bid2next_[inlong_group_id]=0;
+ }
+
+ auto& pool_set=bid_pool->second;
+ int32_t idx=0;
+ for (int32_t i = 0; i < pool_set.size(); i++)
+ {
+ idx = (bid2next_[inlong_group_id]++) % pool_set.size();
+ if (pool_set[idx]->isAvaliable())
+ {
+ return pool_set[idx];
+ }
+
+ }
+
+ return nullptr;
+ }
+ else
+ {
+ if (pools_.empty())
+ {
+ LOG_ERROR("fail to get bufferpool, allocate error in tc_init");
+ return nullptr;
+ }
+
+ int32_t idx=0;
+ for (int32_t i = 0; i < pools_.size(); i++)
+ {
+ idx = (next_++) % pools_.size();
+ if (pools_[idx]->isAvaliable())
+ {
+ return pools_[idx];
+ }
+
+ }
+
+ return nullptr;
+
+ }
+ }
+
+ bool TotalPools::isPoolAvailable(const std::string &inlong_group_id)
+ {
+
+ if (g_config->enable_groupId_isolation_) // bid_isolation
+ {
+ auto bid_pool = bid2pool_map_.find(inlong_group_id);
+ if (bid_pool == bid2pool_map_.end())
+ {
+ LOG_ERROR("no buffer allocated to inlong_group_id:%s, check
config", inlong_group_id.c_str());
+ return false;
+ }
+ for (int i = 0; i < bid_pool->second.size(); i++)
+ {
+ if (bid_pool->second[i]->isAvaliable())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ else // rr
+ {
+ for (int i = 0; i < pools_.size(); i++)
+ {
+ if (pools_[i]->isAvaliable())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ void TotalPools::addUid2BufPool(uint32_t uniqId, BufferPoolPtr& bpr){
+ std::lock_guard<std::mutex> lck(mutex_);
+ uid2buf_pool_[uniqId]=bpr;
+ }
+
+ BufferPoolPtr TotalPools::getUidBufPool(uint32_t uniqId){
+ std::lock_guard<std::mutex> lck(mutex_);
+ auto iter=uid2buf_pool_.find(uniqId);
+ if(iter==uid2buf_pool_.end()){
+ return nullptr;
+ }
+ uid2buf_pool_.erase(iter); //TODO: need add it into bufpool after ack
operation
+
+ return iter->second;
+
+ }
+
+ void TotalPools::showState()
+ {
+ if (g_config->enable_groupId_isolation_)
+ {
+ for (auto &bid_pool : bid2pool_map_)
+ {
+ showStateHelper(bid_pool.first, bid_pool.second);
+ }
+ }
+ else
+ {
+ showStateHelper("", pools_);
+ }
+ }
+
+ void TotalPools::showStateHelper(const std::string &inlong_group_id,
std::vector<BufferPoolPtr> &pools)
+ {
+ for (auto &pool : pools)
+ {
+ pool->showState(inlong_group_id);
+ }
+ }
+
+ void TotalPools::close()
+ {
+ // waiting for sending the rest data
+ if (uid2buf_pool_.size())
+ {
+ LOG_INFO("waiting for 10s to ack remaining msg");
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ }
+
+ for (auto &pool : pools_)
+ {
+ pool->close();
+ }
+
+ for (auto &bid_pool : bid2pool_map_)
+ {
+ for (auto &pool : bid_pool.second)
+ {
+ pool->close();
+ }
+ }
+ }
+
+} // namespace dataproxy_sdk
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h
new file mode 100644
index 000000000..849522c38
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_BUFFER_POOL_H_
+#define DATAPROXY_SDK_NET_BUFFER_POOL_H_
+
+#include <deque>
+#include <mutex>
+#include <system_error>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include "atomic.h"
+#include "sdk_core.h"
+#include "noncopyable.h"
+#include "pack_queue.h"
+#include "recv_buffer.h"
+#include "executor_thread_pool.h"
+
+namespace dataproxy_sdk
+{
+ class SendBuffer;
+ class BufferPool : noncopyable, public
std::enable_shared_from_this<BufferPool>
+ {
+ private:
+ uint32_t pool_id_;
+ uint32_t buf_num_; //sendbuf count
+ uint32_t read_, write_;
+ int32_t current_use_;//sendbuf being used count
+ std::vector<SendBuffer *> buffers_; // allocate memory
+ std::unordered_map<uint32_t, SendBuffer *> already_send_buf_list_;
//key:uniq_id_, sent but not ack
+ mutable std::mutex pool_mutex_;
+ ExecutorThreadPtr executor_;
+
+ AtomicInt send_total_; //send buf num, including resend
+ AtomicInt send_err_; //fail send buf num, including resend
+ AtomicInt resend_total_; //total resent package
+ AtomicInt success_ack_; //total ack msg
+
+ public:
+ // bufusage metrics
+ AtomicUInt waiting_ack_;
+ AtomicUInt has_ack_;
+ AtomicUInt has_send_buf_; //sent to conn
+
+ public:
+ BufferPool(uint32_t pool_id, uint32_t buf_num, uint32_t buf_size);
+ virtual ~BufferPool();
+
+ bool isAvaliable(); //is bufpool avaliable
+ virtual int32_t getSendBuf(SendBuffer *&send_buf);
+ int32_t sendBufToConn(SendBuffer *&send_buf);
+
+ void ackBufHelper(uint32_t uinq_id); // ack
+ void ackBuf(uint32_t uniq_id) {
executor_->postTask(std::bind(&BufferPool::ackBufHelper, shared_from_this(),
uniq_id)); }
+
+ uint32_t poolId() const { return pool_id_; }
+
+ uint32_t writeId() const { return write_; }
+
+ int32_t currentUse() const { return current_use_; }
+
+ void showState(const std::string &inlong_group_id);
+
+ void close();
+
+ private:
+ void RetryHandler(const std::error_code &ec, SendBuffer *buf);
+ };
+
+ using BufferPoolPtr = std::shared_ptr<BufferPool>;
+
+ class TotalPools : noncopyable
+ {
+ private:
+ // round-robin
+ std::vector<BufferPoolPtr> pools_;
+ int32_t next_;
+
+ // bid_isolation
+ std::unordered_map<std::string, std::vector<BufferPoolPtr>> bid2pool_map_;
+ std::unordered_map<std::string, int32_t> bid2next_;
+
+ std::unordered_map<uint32_t, BufferPoolPtr> uid2buf_pool_;// <sent buf
uid, bufpool>
+
+ mutable std::mutex mutex_;
+
+ void close(); // shutdown buffer thread
+ void showStateHelper(const std::string &inlong_group_id,
std::vector<BufferPoolPtr> &pools);
+
+ public:
+ TotalPools();
+ virtual ~TotalPools() { close(); } // FIXME: need other operations?
+ bool isPoolAvailable(const std::string &inlong_group_id);
+ virtual BufferPoolPtr getPool(const std::string &inlong_group_id);
+ void addUid2BufPool(uint32_t uniqId, BufferPoolPtr& bpr);
+ BufferPoolPtr getUidBufPool(uint32_t uniqId);
+ void showState();
+ };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_NET_BUFFER_POOL_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h
new file mode 100644
index 000000000..673c89186
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_SEND_BUFFER_H_
+#define DATAPROXY_SDK_NET_SEND_BUFFER_H_
+
+#include <mutex>
+#include <string>
+
+#include "atomic.h"
+#include "sdk_core.h"
+// #include "executor_thread_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "user_msg.h"
+// #include "socket_connection.h"
+
+namespace dataproxy_sdk
+{
+class SendBuffer : noncopyable
+{
+ private:
+ uint32_t uniq_id_;
+ bool is_used_;
+ bool is_packed_; //is packed completed
+ char* content_; //send data
+ uint32_t size_; //buf_size
+ int32_t msg_cnt_;
+ uint32_t len_; //send data len
+ std::string inlong_group_id_;
+ std::string inlong_stream_id_;
+ AtomicInt already_send_;
+ uint64_t first_send_time_; //ms
+ uint64_t latest_send_time_; //ms
+ ConnectionPtr target_; //send conn
+ std::vector<UserMsgPtr> user_msg_set_;
+
+ public:
+ std::mutex mutex_;
+ SteadyTimerPtr timeout_timer_; //timeout, resend
+ AtomicInt fail_create_conn_; //create conn fail count
+
+ public:
+ SendBuffer(uint32_t size)
+ : is_used_(false)
+ , is_packed_(false)
+ , mutex_()
+ , msg_cnt_(0)
+ , len_(0)
+ , inlong_group_id_()
+ , inlong_stream_id_()
+ , first_send_time_(0)
+ , latest_send_time_(0)
+ , target_(nullptr)
+ , uniq_id_(0)
+ , timeout_timer_(nullptr)
+ , size_(size)
+ {
+ content_ = new char[size];
+ if (content_)
+ {
+ memset(content_, 0x0, size);
+ }
+ }
+ ~SendBuffer()
+ {
+ if (content_) { delete[] content_; }
+ content_ = nullptr;
+ }
+
+ char* content() { return content_; }
+ int32_t msgCnt() const { return msg_cnt_; }
+ void setMsgCnt(const int32_t& msg_cnt) { msg_cnt_ = msg_cnt; }
+ uint32_t len() { return len_; }
+ void setLen(const uint32_t len) { len_ = len; }
+ std::string inlong_group_id() { return inlong_group_id_; }
+ std::string inlong_stream_id() { return inlong_stream_id_; }
+ void setBid(const std::string& inlong_group_id) { inlong_group_id_ =
inlong_group_id; }
+ void setTid(const std::string& inlong_stream_id) { inlong_stream_id_ =
inlong_stream_id; }
+ uint64_t firstSendTime() const { return first_send_time_; }
+ void setFirstSendTime(const uint64_t& first_send_time) { first_send_time_
= first_send_time; }
+ uint64_t latestSendTime() const { return latest_send_time_; }
+ void setLatestSendTime(const uint64_t& latest_send_time) {
latest_send_time_ = latest_send_time; }
+
+ ConnectionPtr target() const { return target_; }
+ void setTarget(ConnectionPtr& target) { target_ = target; }
+
+ inline void increaseRetryNum() { already_send_.increment(); }
+ inline int32_t getAlreadySend() { return already_send_.get(); }
+
+ uint32_t uniqId() const { return uniq_id_; }
+ void setUniqId(const uint32_t& uniq_id) { uniq_id_ = uniq_id; }
+
+ void addUserMsg(UserMsgPtr u_msg) { user_msg_set_.push_back(u_msg); }
+ void doUserCallBack()
+ {
+ LOG_TRACE("failed to send msg, start user call_back");
+ for (auto it : user_msg_set_)
+ {
+ if (it->cb) { it->cb(inlong_group_id_.data(),
inlong_stream_id_.data(), it->msg.data(), it->msg.size(), it->user_report_time,
it->user_client_ip.data()); }
+ }
+ }
+
+ void reset()
+ {
+ uint32_t record_uid = uniq_id_; // for debug
+
+ uniq_id_ = 0;
+ is_used_ = false;
+ is_packed_ = false;
+ memset(content_, 0x0, size_);
+ msg_cnt_ = 0;
+ len_ = 0;
+ inlong_group_id_ = "";
+ inlong_stream_id_ = "";
+ already_send_.getAndSet(0);
+ first_send_time_ = 0;
+ latest_send_time_ = 0;
+ target_ = nullptr;
+ if (timeout_timer_)
+ {
+ timeout_timer_->cancel();
+ timeout_timer_ = nullptr;
+ }
+ user_msg_set_.clear();
+ fail_create_conn_.getAndSet(0);
+ LOG_TRACE("reset senfbuf(uid:%d) successfully", record_uid);
+ }
+
+ bool isPacked() const { return is_packed_; }
+ void setIsPacked(bool is_packed) { is_packed_ = is_packed; }
+
+ bool isUsed() const { return is_used_; }
+ void setIsUsed(bool is_used) { is_used_ = is_used; }
+};
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_NET_SEND_BUFFER_H_
\ No newline at end of file