This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d318f8374 [INLONG-5253][SDK] DataProxy SDK(cpp) buffer manager (#5255)
d318f8374 is described below

commit d318f83741475f54ebd9f6381eccfdc010eeca02
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 17:46:06 2022 +0800

    [INLONG-5253][SDK] DataProxy SDK(cpp) buffer manager (#5255)
---
 .../dataproxy-sdk-cpp/src/base/pack_queue.cc       | 580 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/base/pack_queue.h        | 134 +++++
 .../dataproxy-sdk-cpp/src/net/buffer_pool.cc       | 428 +++++++++++++++
 .../dataproxy-sdk-cpp/src/net/buffer_pool.h        | 120 +++++
 .../dataproxy-sdk-cpp/src/net/send_buffer.h        | 155 ++++++
 5 files changed, 1417 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
new file mode 100644
index 000000000..93bd542e8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.cc
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "pack_queue.h"
+
+#include <cstdlib>
+#include <functional>
+
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "sdk_core.h"
+#include "logger.h"
+#include "msg_protocol.h"
+#include "send_buffer.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+#include "utils.h"
+#include "msg_protocol.h"
+
+namespace dataproxy_sdk
+{
+    PackQueue::PackQueue(const std::string &inlong_group_id, const std::string 
&inlong_stream_id)
+        : cur_len_(0), inlong_group_id_(inlong_group_id), 
inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0), 
msg_type_(g_config->msg_type_), data_capacity_(g_config->buf_size_)
+    {
+        data_ = new char[data_capacity_];
+        memset(data_, 0x0, data_capacity_);
+        topic_desc_ = "inlong_group_id=" + inlong_group_id_ + 
"&inlong_stream_id=" + inlong_stream_id_;
+        first_use_ = Utils::getCurrentMsTime();
+        last_use_ = Utils::getCurrentMsTime();
+        data_time_ = 0;
+    }
+
+    PackQueue::~PackQueue()
+    {
+        if (data_)
+        {
+            delete[] data_;
+            data_ = nullptr;
+        }
+    }
+
+    int32_t PackQueue::sendMsg(const std::string &msg,
+                               const std::string &inlong_group_id,
+                               const std::string &inlong_stream_id,
+                               const std::string &client_ip,
+                               uint64_t report_time,
+                               UserCallBack call_back)
+    {
+        std::lock_guard<std::mutex> lck(mutex_);
+
+        //pack previous
+        if (isTriggerPack(report_time, msg.size()))
+        {
+            int32_t res = writeToBuf();
+            if (res)
+            {
+                increasePackErr();
+                return res;
+            }
+        }
+
+        //write data to packqueue
+        int32_t append_ret = appendMsg(msg, client_ip, report_time, call_back);
+        if (append_ret)
+        {
+            LOG_ERROR("fail to write to pack queue, inlong_group_id: %s, 
inlong_stream_id: %s", inlong_group_id.c_str(), inlong_stream_id.c_str());
+            return append_ret;
+        }
+
+        //if uneable_pack, single msg is written to packqueue and directly 
sent to buf
+        if (!g_config->enable_pack_)
+        {
+            int32_t res = writeToBuf();
+            if (res)
+            {
+                increasePackErr();
+                return res;
+            }
+        }
+        return 0;
+    }
+
+    //send pack data to buf
+    int32_t PackQueue::writeToBuf()
+    {
+        if (inlong_group_id_.empty())
+        {
+            LOG_ERROR("something is wrong, check!!");
+            return SDKInvalidResult::kFailGetConn;
+        }
+        if (msg_set_.empty())
+        {
+            LOG_ERROR("no msg in msg_set, check!");
+            return SDKInvalidResult::kFailGetPackQueue;
+        }
+        auto conn = g_clusters->getSendConn(inlong_group_id_);
+        if (!conn)
+        {
+            LOG_ERROR("no avaliable connection for inlong_group_id: %s, try 
later", inlong_group_id_.c_str());
+            return SDKInvalidResult::kFailGetConn;
+        }
+        auto pool = g_pools->getPool(inlong_group_id_);
+        if (!pool)
+        {
+            return SDKInvalidResult::kFailGetBufferPool;
+        }
+
+        SendBuffer *send_buf = nullptr;
+
+        int32_t res = pool->getSendBuf(send_buf);
+        if (res)
+        {
+            return res;
+        }
+        if (!send_buf)
+        {
+            LOG_ERROR("failed to get send_buf, something gets wrong, 
checkout!");
+            return SDKInvalidResult::kFailGetSendBuf;
+        }
+
+        //lock sendbuf and write pack data to sendbuf
+        {
+            std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
+
+            uint32_t len = 0;
+            int32_t msg_cnt = msg_set_.size();
+            // std::string msg_bid = inlong_group_id_;
+            uint32_t uniq_id = g_send_msgid.incrementAndGet();
+            if (!packOperate(send_buf->content(), len, uniq_id) || len == 0)
+            {
+                LOG_ERROR("failed to write data to send buf from pack queue, 
pool id:%d, buf id:%d", pool->poolId(), pool->writeId());
+                return SDKInvalidResult::kFailWriteToBuf;
+            }
+            send_buf->setLen(len);
+            send_buf->setMsgCnt(msg_cnt);
+            send_buf->setBid(inlong_group_id_);
+            send_buf->setTid(inlong_stream_id_);
+            send_buf->setUniqId(uniq_id);
+            send_buf->setTarget(conn);
+            send_buf->setIsPacked(true);
+
+            for (auto it : msg_set_)
+            {
+                send_buf->addUserMsg(it);
+            }
+        }
+
+        pack_num_.increment();
+        g_pools->addUid2BufPool(send_buf->uniqId(),pool);//used for ack finding
+
+        resetPackQueue();
+
+        pool->sendBufToConn(send_buf);
+        return 0;
+    }
+
+    int32_t PackQueue::appendMsg(const std::string &msg, std::string 
client_ip, int64_t report_time, UserCallBack call_back)
+    {
+        //too long msg
+        if (msg.size() > g_config->ext_pack_size_)
+        {
+            LOG_ERROR("msg len (%d) more than ext_pack_size (%d)", msg.size(), 
g_config->ext_pack_size_);
+            return SDKInvalidResult::kMsgTooLong;
+        }
+
+        //if datatime is illegal, fix it using current time
+        if (Utils::isLegalTime(report_time))
+            data_time_ = report_time;
+        else
+        {
+            data_time_ = Utils::getCurrentMsTime();
+            pack_redotime_cnt_.increment();
+        }
+
+        //used for callback
+        if (call_back)
+        {
+            std::string user_client_ip = client_ip;
+            int64_t user_report_time = report_time;
+            if (client_ip.empty())
+            {
+                client_ip = "127.0.0.1";
+            }
+            std::string data_pack_format_attr = "__addcol1__reptime=" + 
Utils::getFormatTime(data_time_) + "&__addcol2__ip=" + client_ip;
+            msg_set_.emplace_back(std::make_shared<UserMsg>(msg, client_ip, 
data_time_, call_back, data_pack_format_attr, user_client_ip, 
user_report_time));
+        }
+        
+        cur_len_ += msg.size() + 1; // '\n' using one byte
+
+        if (g_config->isNormalDataPackFormat())
+        {
+            cur_len_ += 4;
+        }
+        if (g_config->isAttrDataPackFormat())
+        {
+            cur_len_ += constants::kAttrLen + 8;
+        }
+
+        //update last using time
+        last_use_ = Utils::getCurrentMsTime();
+
+        return 0;
+    }
+
+    /**
+     * @description: whether trigger pack data
+     * @param {uint64_t} report_time
+     * @param {int32_t} msg_len
+     */
+    bool PackQueue::isTriggerPack(uint64_t report_time, int32_t msg_len)
+    {
+        if (0 == cur_len_ || msg_set_.empty())
+            return false;
+
+        if (!Utils::isLegalTime(report_time))
+        { 
+            report_time = Utils::getCurrentMsTime();
+        }
+
+        int64_t max_pack_time_interval = 1800000; // FIXME: use user config?
+        bool time_trigger = false;                //timeout trigger
+        bool len_trigger = false;                 //content trigger
+        if (llabs(report_time - data_time_) > max_pack_time_interval ||
+            (report_time != data_time_ && report_time / 1000 / 3600 != 
data_time_ / 1000 / 3600))
+        {
+            time_trigger = true;
+        }
+        if (msg_len + cur_len_ > g_config->pack_size_)
+        {
+            len_trigger = true;
+        }
+
+        return (time_trigger || len_trigger);
+    }
+
+    /**
+     * @description: do pack operate
+     * @param {int8*} pack_data: packed binary data
+     * @param {uint32_t&} out_len
+     * @return {*} true if pack successfully
+     */
+    bool PackQueue::packOperate(char *pack_data, uint32_t &out_len, uint32_t 
uniq_id)
+    {
+        if (!pack_data)
+        {
+            LOG_ERROR("nullptr, failed to allocate memory for buf");
+            return false;
+        }
+        //add body into data_, then zip and copy to buffer
+        uint32_t idx = 0;
+        for (auto &it : msg_set_)
+        {
+            //msg>=5,body format: data_len|data
+
+            if (msg_type_ >= 5) //add data_len
+            {
+                *(uint32_t *)(&data_[idx]) = htonl(it->msg.size());
+                idx += sizeof(uint32_t);
+            }
+            //add data
+            memcpy(&data_[idx], it->msg.data(), it->msg.size());
+            idx += static_cast<uint32_t>(it->msg.size());
+
+            //add attrlen|attr
+            if (g_config->isAttrDataPackFormat())
+            {
+                *(uint32_t *)(&data_[idx]) = 
htonl(it->data_pack_format_attr.size());
+                idx += sizeof(uint32_t);
+
+                memcpy(&data_[idx], it->data_pack_format_attr.data(), 
it->data_pack_format_attr.size());
+                idx += static_cast<uint32_t>(it->data_pack_format_attr.size());
+            }
+
+            // msgtype = 2/3 support '\n'
+            if (msg_type_ == 2 || msg_type_ == 3)
+            {
+                data_[idx] = '\n';
+                ++idx;
+            }
+        }
+
+        //preprocess attr
+        uint32_t cnt = 1;
+        if (msg_set_.size())
+        {
+            cnt = msg_set_.size();
+        }
+
+        //pack
+        if (msg_type_ >= constants::kBinPackMethod)
+        {
+            char *bodyBegin = pack_data + sizeof(BinaryMsgHead) + 
sizeof(uint32_t); //head+body_len
+            uint32_t body_len = 0;
+
+            std::string snappy_res;
+            bool isSnappy = isZipAndOperate(snappy_res, idx);
+            char real_msg_type;
+
+            if (isSnappy) // need zip
+            {
+                body_len = static_cast<uint32_t>(snappy_res.size());
+                memcpy(bodyBegin, snappy_res.data(), body_len); //copy data to 
buf
+                // msg_type
+                real_msg_type = (msg_type_ | constants::kBinSnappyFlag);
+            }
+            else
+            {
+                body_len = idx;
+                memcpy(bodyBegin, data_, body_len);
+                real_msg_type = msg_type_;
+            }
+            *(uint32_t *)(&(pack_data[sizeof(BinaryMsgHead)])) = 
htonl(body_len); //set bodylen
+
+            bodyBegin += body_len;
+
+            // bid_num、tid_num、ext_field、data_time、cnt、uniq
+            uint32_t char_bid_flag = 0;
+            std::string bid_tid_char;
+            uint16_t bid_num = 0, tid_num = 0;
+            if (g_config->enableCharBid() || groupId_num_ == 0 || 
streamId_num_ == 0) //using string groupid and streamid
+            {
+                bid_num = 0;
+                tid_num = 0;
+                bid_tid_char = topic_desc_;
+                char_bid_flag = 0x4;
+            }
+            else
+            {
+                bid_num = groupId_num_;
+                tid_num = streamId_num_;
+            }
+            uint16_t ext_field = (g_config->extend_field_ | char_bid_flag);
+            uint32_t data_time = data_time_ / 1000;
+
+            // attr
+            std::string attr;
+            if (g_config->enableTraceIP())
+            {
+                if (bid_tid_char.empty())
+                    attr = "node1ip=" + g_config->ser_ip_ + "&rtime1=" + 
std::to_string(Utils::getCurrentMsTime());
+                else
+                    attr = bid_tid_char + "&node1ip=" + g_config->ser_ip_ + 
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
+            }
+            else
+            {
+                attr = topic_desc_;
+            }
+            // attrlen
+            *(uint16_t *)bodyBegin = htons(attr.size());
+            bodyBegin += sizeof(uint16_t);
+            // attr
+            memcpy(bodyBegin, attr.data(), attr.size());
+            bodyBegin += attr.size();
+
+            // magic
+            *(uint16_t *)bodyBegin = htons(constants::kBinaryMagic);
+
+            uint32_t total_len = 25 + body_len + attr.size();
+
+            // header
+            char *p = pack_data;
+            *(uint32_t *)p = htonl(total_len);
+            p += 4;
+            *p = real_msg_type;
+            ++p;
+            *(uint16_t *)p = htons(bid_num);
+            p += 2;
+            *(uint16_t *)p = htons(tid_num);
+            p += 2;
+            *(uint16_t *)p = htons(ext_field);
+            p += 2;
+            *(uint32_t *)p = htonl(data_time);
+            p += 4;
+            *(uint16_t *)p = htons(cnt);
+            p += 2;
+            *(uint32_t *)p = htonl(uniq_id);
+
+            out_len = total_len + 4;
+        }
+        else
+        {
+            if (msg_type_ == 3 || msg_type_ == 2)
+            {
+                --idx;
+            }
+
+            // body whether needs zip
+            char *bodyBegin = pack_data + sizeof(ProtocolMsgHead) + 
sizeof(uint32_t);
+            uint32_t body_len = 0;
+            std::string snappy_res;
+            bool isSnappy = isZipAndOperate(snappy_res, idx);
+            if (isSnappy)
+            {
+                body_len = static_cast<uint32_t>(snappy_res.size());
+                memcpy(bodyBegin, snappy_res.data(), body_len); //copy
+            }
+            else
+            {
+                body_len = idx;
+                memcpy(bodyBegin, data_, body_len);
+            }
+            *(uint32_t *)(&(pack_data[sizeof(ProtocolMsgHead)])) = 
htonl(body_len); //set bodylen
+            bodyBegin += body_len;
+
+            // attr
+            std::string attr;
+            attr = topic_desc_;
+            attr += "&dt=" + std::to_string(data_time_);
+            attr += "&mid=" + std::to_string(uniq_id);
+            if (isSnappy)
+                attr += "&cp=snappy";
+            attr += "&cnt=" + std::to_string(cnt);
+            attr += "&sid=" + std::string(Utils::getSnowflakeId());
+            if (g_config->is_from_DC_)
+            { //&__addcol1_reptime=yyyymmddHHMMSS&__addcol2__ip=BBB&f=dc
+                attr += "&__addcol1_reptime=" + 
Utils::getFormatTime(Utils::getCurrentMsTime()) + "&__addcol2__ip=" + 
g_config->ser_ip_ + "&f=dc";
+            }
+
+            // attrlen
+            *(uint32_t *)bodyBegin = htonl(attr.size());
+            bodyBegin += sizeof(uint32_t);
+            // attr
+            memcpy(bodyBegin, attr.data(), attr.size());
+
+            // total_len
+            uint32_t total_len = 1 + 4 + body_len + 4 + attr.size();
+            *(uint32_t *)pack_data = htonl(total_len);
+            // msg_type
+            *(&pack_data[4]) = msg_type_;
+
+            LOG_TRACE("after packoperate: total_len:%d, body_len:%d, 
attr_len:%d", total_len, body_len, attr.size());
+
+            out_len = total_len + 4;
+        }
+        return true;
+    }
+
+    /**
+     * @description: whether body data should be zipped; if needs, zip data 
and save as res
+     * @param {string&} res
+     * @param {uint32_t} real_cur_len: data len before zip
+     * @return {*} true if needing zip
+     */
+    bool PackQueue::isZipAndOperate(std::string &res, uint32_t real_cur_len)
+    {
+        if (g_config->enable_zip_ && real_cur_len > g_config->min_zip_len_)
+        {
+            LOG_TRACE("start snappy.");
+            Utils::zipData(data_, real_cur_len, res);
+            return true;
+        }
+        else
+            return false;
+    }
+
+    void PackQueue::checkQueue(bool isLastPack)
+    {
+        if (cur_len_ == 0 || msg_set_.empty())
+            return;
+        //no timeout, and it isn't last packing
+        if (Utils::getCurrentMsTime() - first_use_ < g_config->pack_timeout_ 
&& !isLastPack)// FIXME:should use first_use instead of last_use?
+            return;
+        LOG_TRACE("start auto pack, inlong_group_id:%s, inlong_stream_id:%s", 
inlong_group_id_.c_str(), inlong_stream_id_.c_str());
+
+        std::lock_guard<std::mutex> lck(mutex_);
+        last_use_ = Utils::getCurrentMsTime();
+        //send fail, callback and reset
+        if (writeToBuf()) 
+        {
+            for (auto& it : msg_set_)
+            {
+               if (it->cb) { it->cb(inlong_group_id_.data(), 
inlong_stream_id_.data(), it->msg.data(), it->msg.size(), it->user_report_time, 
it->user_client_ip.data()); }
+         
+            }
+            resetPackQueue();
+            
+        }
+    }
+
+    GlobalQueues::~GlobalQueues()
+    {
+        closeCheckSubroutine();
+        if (worker_.joinable())
+        {
+            worker_.join();
+        }
+        for (auto it : queues_) 
+        {
+            it.second->checkQueue(true);
+        }
+        queues_.clear();
+    }
+
+    void GlobalQueues::startCheckSubroutine() { worker_ = 
std::thread(&GlobalQueues::checkPackQueueSubroutine, this); }
+
+    void GlobalQueues::checkPackQueueSubroutine()
+    {
+        LOG_INFO("start checkPackQueue subroutine");
+        while (!exit_flag_)
+        {
+            Utils::taskWaitTime(2); // FIXME:improve pack interval
+            for (auto it : queues_)
+            {
+                it.second->checkQueue(exit_flag_);
+            }
+        }
+        LOG_WARN("exit checkPackQueue subroutine");
+    }
+
+    //get pack queue, create if not exists
+    PackQueuePtr GlobalQueues::getPackQueue(const std::string 
&inlong_group_id, const std::string &inlong_stream_id)
+    {
+        std::lock_guard<std::mutex> lck(mutex_);
+
+        auto it = queues_.find(inlong_group_id + inlong_stream_id);
+        if (it != queues_.end())
+            return it->second;
+        else
+        {
+            PackQueuePtr p = std::make_shared<PackQueue>(inlong_group_id, 
inlong_stream_id);
+            queues_.emplace(inlong_group_id + inlong_stream_id, p);
+            return p;
+        }
+    }
+    void GlobalQueues::printAck()
+    {
+        if (queues_.empty())
+            return;
+        for (auto &it : queues_)
+        {
+            LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#success send 
msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+                     it.second->success_num_.getAndSet(0));
+        }
+    }
+
+    void GlobalQueues::printTotalAck()
+    {
+        if (queues_.empty())
+            return;
+        for (auto &it : queues_)
+        {
+            LOG_WARN("-------> dataproxy_sdk_cpp #local:%s#%s#total success 
msg:%d", g_config->ser_ip_.c_str(), it.second->topicDesc().c_str(),
+                     it.second->total_success_num_.get());
+        }
+    }
+
+    void GlobalQueues::showState()
+    {
+        if (1 == user_exit_flag.get())
+            return;
+        uint32_t total_pack = 0;
+        for (auto &it : queues_)
+        {
+            uint32_t pack = it.second->pack_num_.getAndSet(0);
+            total_pack += pack;
+            LOG_DEBUG("------->toipc:%s, pack_num:%d", 
it.second->topicDesc().c_str(), pack);
+        }
+        LOG_DEBUG("------->total_pack:%d", total_pack);
+
+        g_pools->showState();
+        g_executors->showState();
+    }
+
+} // namespace dataproxy_sdk
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h
new file mode 100644
index 000000000..dccf09b97
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/pack_queue.h
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_PACK_QUEUE_H_
+#define DATAPROXY_SDK_BASE_PACK_QUEUE_H_
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+
+#include "atomic.h"
+#include "buffer_pool.h"
+#include "sdk_core.h"
+#include "client_config.h"
+#include "noncopyable.h"
+#include "user_msg.h"
+namespace dataproxy_sdk
+{
+// groupid:packqueue=1:1 
+class PackQueue
+{
+  private:
+    char* data_;                       
+    std::vector<UserMsgPtr> msg_set_;  
+    uint32_t data_capacity_;           // pack_size+extend_pack_size
+    uint32_t cur_len_;                 //data+additional attr
+    AtomicInt pack_redotime_cnt_;  //msg_cnt, report time is illegal
+    AtomicInt pack_err_;           //pack err metrics
+    uint64_t first_use_;           
+    uint64_t last_use_;            
+    uint64_t data_time_;           //data time
+    std::string inlong_group_id_;
+    std::string inlong_stream_id_;
+    uint16_t groupId_num_;        
+    uint16_t streamId_num_;      
+    std::string topic_desc_;  // inlong_group_id=xxx&inlong_stream_id=xxx
+    uint32_t msg_type_;
+    mutable std::mutex mutex_;
+
+  public:
+    AtomicUInt success_num_;        //ack msg num in every one min
+    AtomicUInt total_success_num_;  //total ack msg num
+
+    AtomicUInt pack_num_;  //package num in every one min
+
+  public:
+    PackQueue(const std::string& inlong_group_id, const std::string& 
inlong_stream_id);
+    ~PackQueue();
+
+    int32_t sendMsg(const std::string& msg,
+                    const std::string& inlong_group_id,
+                    const std::string& inlong_stream_id,
+                    const std::string& client_ip,
+                    uint64_t report_time,
+                    UserCallBack cb);
+    bool packOperate(char* pack_data, uint32_t& out_len, uint32_t uniq_id);
+    void checkQueue(bool isLastPack);  //if it's last pack, pack and send the 
rest data
+    uint32_t curLen() const { return cur_len_; }
+    void setCurLen(const uint32_t& cur_len) { cur_len_ = cur_len; }
+    uint64_t firstUse() const { return first_use_; }
+    void setFirstUse(const uint64_t& first_use) { first_use_ = first_use; }
+    uint64_t lastUse() const { return last_use_; }
+    void setLastUse(const uint64_t& last_use) { last_use_ = last_use; }
+    uint64_t dataTime() const { return data_time_; }
+    void setDataTime(const uint64_t& data_time) { data_time_ = data_time; }
+    char* data() const { return data_; }
+
+    std::string inlong_group_id() const { return inlong_group_id_; }
+    void setBid(const std::string& inlong_group_id) { inlong_group_id_ = 
inlong_group_id; }
+
+    void increasePackErr() { pack_err_.increment(); }
+
+    std::string topicDesc() const { return topic_desc_; }
+
+  private:
+    bool isTriggerPack(uint64_t msg_time, int32_t msg_len);
+    int32_t writeToBuf();
+    int32_t appendMsg(const std::string& msg, std::string client_ip, int64_t 
report_time, UserCallBack call_back);
+    bool isZipAndOperate(std::string& res, uint32_t real_cur_len);
+    inline void resetPackQueue()
+    {
+        memset(data_, 0x0, data_capacity_);
+        cur_len_ = 0;
+        msg_set_.clear();
+    }
+};
+
+using PackQueuePtr = std::shared_ptr<PackQueue>;
+
+// all packqueues
+class GlobalQueues : noncopyable
+{
+  private:
+    std::unordered_map<std::string, PackQueuePtr> queues_;
+    mutable std::mutex mutex_;
+    std::thread worker_;  //pack thread
+    bool exit_flag_;
+
+  public:
+    GlobalQueues() : exit_flag_(false){};
+    virtual ~GlobalQueues();
+    void startCheckSubroutine();
+    virtual PackQueuePtr getPackQueue(const std::string& inlong_group_id, 
const std::string& inlong_stream_id);
+    void closeCheckSubroutine() { exit_flag_ = true; }
+    void printAck();
+    void printTotalAck();
+
+    void showState();
+
+  private:
+    void checkPackQueueSubroutine();
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_PACK_QUEUE_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
new file mode 100644
index 000000000..6604b625d
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.cc
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "buffer_pool.h"
+
+#include <chrono>
+#include <memory>
+#include <vector>
+
+#include "proxylist_config.h"
+#include "sdk_core.h"
+#include "logger.h"
+#include "send_buffer.h"
+#include "socket_connection.h"
+#include "tc_api.h"
+
+namespace dataproxy_sdk
+{
+    BufferPool::BufferPool(uint32_t pool_id, uint32_t buf_num, uint32_t 
buf_size)
+        : pool_id_(pool_id), buf_num_(buf_num), read_(0), write_(0), 
current_use_(0), pool_mutex_(), 
executor_(std::make_shared<ExecutorThread>(pool_id))
+    {
+        buffers_.reserve(buf_num_);
+        for (int i = 0; i < buf_num_; i++)
+        {
+            buffers_.push_back(new SendBuffer(buf_size));
+        }
+    }
+
+    BufferPool::~BufferPool()
+    { 
+        for (auto it : buffers_)
+        {
+            delete it;
+        }
+        buffers_.clear();
+        LOG_DEBUG("free send buffer memory, pool(id:%d)", pool_id_);
+    }
+
+    bool BufferPool::isAvaliable()
+    {
+        return current_use_ < buf_num_;
+    }
+
+    int32_t BufferPool::getSendBuf(SendBuffer *&send_buf)
+    {
+        std::lock_guard<std::mutex> lck(pool_mutex_);
+
+        if (current_use_ >= buf_num_)
+        {
+            LOG_ERROR("buffer pool(id:%d) is full", pool_id_);
+            return SDKInvalidResult::kBufferPoolFull;
+        }
+
+        while (buffers_[write_]->isUsed())
+        {
+            write_ = (write_ + 1) % buf_num_;
+        }
+
+        send_buf = buffers_[write_];
+        // ++write_;
+        if (!send_buf)
+        {
+            LOG_ERROR("failed to get send buf, pool(id:%d)", pool_id_);
+            return SDKInvalidResult::kFailGetSendBuf;
+        }
+
+        send_buf->setIsUsed(true);
+        ++current_use_;
+
+        return 0;
+    }
+
+    int32_t BufferPool::sendBufToConn(SendBuffer *&send_buf)
+    {
+        std::lock_guard<std::mutex> pool_lck(pool_mutex_);
+
+        std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
+
+        auto self = shared_from_this();
+        if (g_config->retry_num_ > 0 && g_config->retry_interval_ > 0) 
//resend if need
+        {
+            executor_->postTask([self, this, send_buf]
+                                {
+                send_buf->timeout_timer_ = executor_->createSteadyTimer();
+                
send_buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+                
send_buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, 
shared_from_this(), std::placeholders::_1, send_buf)); });
+        }
+        if (send_buf->target()->isStop())
+        {
+            auto new_conn = 
g_clusters->createActiveConn(send_buf->inlong_group_id(), pool_id_);
+            if (!new_conn)
+            {
+                LOG_WARN("first send buf, fail to create new conn for sendbuf, 
inlong_group_id:%s, inlong_stream_id:%s", send_buf->inlong_group_id().c_str(), 
send_buf->inlong_stream_id().c_str());
+                send_buf->fail_create_conn_.increment();
+                // send_buf->increaseRetryNum();
+                // send_err_.increment();
+                already_send_buf_list_[send_buf->uniqId()] = send_buf; 
//preparing for next sending
+                return -1;
+            }
+            send_buf->setTarget(new_conn);
+        }
+        send_buf->target()->sendBuf(send_buf); //do send operation
+        send_buf->increaseRetryNum();          
+        LOG_TRACE("request buf(id:%d) send to connection%s first time, 
inlong_group_id:%s, inlong_stream_id:%s", send_buf->uniqId(), 
send_buf->target()->getRemoteInfo().c_str(),
+                  send_buf->inlong_group_id().c_str(), 
send_buf->inlong_stream_id().c_str());
+
+        already_send_buf_list_[send_buf->uniqId()] = send_buf; //add buf uid 
into sent list
+
+        send_total_.increment();
+
+        //update metrics
+        has_send_buf_.increment();
+        waiting_ack_.increment();
+
+        LOG_TRACE("success to write buf pool, pool(id:%d), buf(uid:%d), 
inlong_group_id:%s, inlong_stream_id:%s", pool_id_, send_buf->uniqId(), 
send_buf->inlong_group_id().c_str(),
+                  send_buf->inlong_stream_id().c_str());
+        return 0;
+    }
+
+    void BufferPool::RetryHandler(const std::error_code &ec, SendBuffer *buf)
+    {
+        if (ec) // timer is cancelled, two cases: 
1.ackbuf->sendbuf.reset;2.msg_type=2,conn.doWrite->cancel
+        {
+            if (g_config->msg_type_ == 2)
+            {
+                LOG_TRACE("msg_type is 2, no need ackmsg, clear buf(uid:%d) 
directly", buf->uniqId());
+                ackBuf(buf->uniqId());
+            }
+            return;
+        }
+
+        if (!buf->isUsed())
+        {
+            return;
+        } // buf is already acked before retry
+
+        std::lock_guard<std::mutex> buf_lck(buf->mutex_);
+
+        if (buf->getAlreadySend() == 2)
+        {
+            LOG_INFO("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) 
ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), 
buf->inlong_group_id().c_str(),
+                  buf->inlong_stream_id().c_str(), buf->getAlreadySend(), 
g_config->retry_num_);
+        }
+        else
+        {
+            LOG_DEBUG("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) 
ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), 
buf->inlong_group_id().c_str(),
+                  buf->inlong_stream_id().c_str(), buf->getAlreadySend(), 
g_config->retry_num_);
+        }
+        
+        // max_retry_num, usercallback
+        if (buf->getAlreadySend() >= g_config->retry_num_ || 
buf->fail_create_conn_.get() >= constants::kMaxRetryConnection)
+        {
+            LOG_WARN("fail to send buf(id:%d, inlong_group_id:%s, 
inlong_stream_id:%s), has send max_retry_num(%d) times, start usercallback", 
buf->uniqId(), buf->inlong_group_id().c_str(),
+                     buf->inlong_stream_id().c_str(), g_config->retry_num_);
+            buf->doUserCallBack();
+            buf->reset();
+        }
+        else // ack timeout, resend
+        {
+            if (!buf->target() || buf->target()->isStop())
+            {
+                auto new_conn = 
g_clusters->createActiveConn(buf->inlong_group_id(), pool_id_); // TODO: should 
improve as choosing from active conn instread of creating?
+
+                if (!new_conn) //create conn error, waiting for next creating
+                {
+                    LOG_INFO("fail to create new conn to send buf, 
inlong_group_id:%s, inlong_stream_id:%s", buf->inlong_group_id().c_str(), 
buf->inlong_stream_id().c_str());
+                    buf->fail_create_conn_.increment();
+                    
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+                    
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, 
shared_from_this(), std::placeholders::_1, buf));
+                    return;
+                }
+                buf->setTarget(new_conn);
+            }
+
+            buf->target()->sendBuf(buf);
+            buf->increaseRetryNum();
+            
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config->retry_interval_));
+            
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, 
shared_from_this(), std::placeholders::_1, buf));
+        }
+    }
+
+    void BufferPool::ackBufHelper(uint32_t uniq_id)
+    {
+        std::lock_guard<std::mutex> lck(pool_mutex_);
+        if (already_send_buf_list_.find(uniq_id) == 
already_send_buf_list_.end())
+        {
+            LOG_ERROR("no buf(uid:%d) in already_send_buf_list", uniq_id);
+            return;
+        }
+        auto &buf2ack = already_send_buf_list_[uniq_id];
+
+        std::lock_guard<std::mutex> buf_lck(buf2ack->mutex_);
+
+        int32_t msg_cnt = buf2ack->msgCnt();
+        success_ack_.add(msg_cnt);
+
+        //update packqueue metrics
+        if(g_queues){
+            auto packqueue = 
g_queues->getPackQueue(buf2ack->inlong_group_id(), buf2ack->inlong_stream_id());
+            packqueue->success_num_.add(msg_cnt);
+            packqueue->total_success_num_.add(msg_cnt);
+        }
+
+        already_send_buf_list_[uniq_id]->reset();
+        already_send_buf_list_.erase(uniq_id);
+        --current_use_;
+
+        waiting_ack_.decrement();
+        has_ack_.increment();
+        LOG_TRACE("pool(id:%d) success ack msg cumulative num: %d", pool_id_, 
success_ack_.get());
+    }
+
+    void BufferPool::showState(const std::string &inlong_group_id)
+    {
+        if (inlong_group_id.empty())
+        {
+            LOG_INFO("STATE|pool_id:%d, current_use:%d(total:%d), 
invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", pool_id_, currentUse(),
+                      g_config->bufNum(), has_send_buf_.get(), has_ack_.get(), 
waiting_ack_.get());
+        }
+        else
+        {
+            LOG_INFO("STATE|inlong_group_id:%s, pool_id:%d, 
current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", 
inlong_group_id.c_str(), pool_id_, currentUse(),
+                      g_config->bufNum(), has_send_buf_.get(), has_ack_.get(), 
waiting_ack_.get());
+        }
+    }
+
+    void BufferPool::close()
+    {
+        if (executor_ != nullptr)
+        {
+            executor_->close();
+        }
+        executor_.reset();
+    }
+
+    TotalPools::TotalPools() : next_(0), mutex_()
+    {
+        if (g_config->enable_groupId_isolation_) // different groupid data use 
different bufpool
+        {
+            for (int32_t i = 0; i < g_config->inlong_group_ids_.size(); i++) 
// create a bufpool for ervery groupidS
+            {
+                std::vector<BufferPoolPtr> bid_pool;
+                bid_pool.reserve(g_config->buffer_num_per_groupId_);
+                for (int32_t j = 0; j < g_config->buffer_num_per_groupId_; j++)
+                {
+                    bid_pool.push_back(std::make_shared<BufferPool>(j, 
g_config->bufNum(), g_config->buf_size_));
+                }
+
+                bid2pool_map_[g_config->inlong_group_ids_[i]] = bid_pool;
+                bid2next_[g_config->inlong_group_ids_[i]] = 0;
+            }
+        }
+        else // round-robin
+        {
+            pools_.reserve(g_config->shared_buf_nums_);
+            for (int i = 0; i < g_config->shared_buf_nums_; i++)
+            {
+                pools_.push_back(std::make_shared<BufferPool>(i, 
g_config->bufNum(), g_config->buf_size_));
+            }
+        }
+    }
+
+    BufferPoolPtr TotalPools::getPool(const std::string &inlong_group_id)
+    {
+        if (g_config->enable_groupId_isolation_) // bid隔离
+        {
+            auto bid_pool = bid2pool_map_.find(inlong_group_id);
+            if (bid_pool == bid2pool_map_.end() || bid_pool->second.empty())
+            {
+                LOG_ERROR("fail to get bufferpool, inlong_group_id:%s", 
inlong_group_id.c_str());
+                return nullptr;
+            }
+            if (bid2next_.find(inlong_group_id)==bid2next_.end())
+            {
+                bid2next_[inlong_group_id]=0;
+            }
+            
+            auto& pool_set=bid_pool->second;
+            int32_t idx=0;
+            for (int32_t i = 0; i < pool_set.size(); i++)
+            {
+                idx = (bid2next_[inlong_group_id]++) % pool_set.size();
+                if (pool_set[idx]->isAvaliable())
+                {
+                    return pool_set[idx];
+                }
+                
+            }
+
+            return nullptr;
+        }
+        else
+        {
+            if (pools_.empty())
+            {
+                LOG_ERROR("fail to get bufferpool, allocate error in tc_init");
+                return nullptr;
+            }
+
+            int32_t idx=0;
+            for (int32_t i = 0; i < pools_.size(); i++)
+            {
+                idx = (next_++) % pools_.size();
+                if (pools_[idx]->isAvaliable())
+                {
+                    return pools_[idx];
+                }
+                
+            }
+
+            return nullptr;
+            
+        }
+    }
+
+    bool TotalPools::isPoolAvailable(const std::string &inlong_group_id)
+    {
+
+        if (g_config->enable_groupId_isolation_) // bid_isolation
+        {
+            auto bid_pool = bid2pool_map_.find(inlong_group_id);
+            if (bid_pool == bid2pool_map_.end())
+            {
+                LOG_ERROR("no buffer allocated to inlong_group_id:%s, check 
config", inlong_group_id.c_str());
+                return false;
+            }
+            for (int i = 0; i < bid_pool->second.size(); i++)
+            {
+                if (bid_pool->second[i]->isAvaliable())
+                {
+                    return true;
+                }
+            }
+            return false;
+        }
+        else // rr
+        {
+            for (int i = 0; i < pools_.size(); i++)
+            {
+                if (pools_[i]->isAvaliable())
+                {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    void TotalPools::addUid2BufPool(uint32_t uniqId, BufferPoolPtr& bpr){
+        std::lock_guard<std::mutex> lck(mutex_);
+        uid2buf_pool_[uniqId]=bpr;
+    }
+
+    BufferPoolPtr TotalPools::getUidBufPool(uint32_t uniqId){
+        std::lock_guard<std::mutex> lck(mutex_);
+        auto iter=uid2buf_pool_.find(uniqId);
+        if(iter==uid2buf_pool_.end()){
+            return nullptr;
+        }
+        uid2buf_pool_.erase(iter); //TODO: need add it into bufpool after ack 
operation
+        
+        return iter->second;
+        
+    }
+
+    void TotalPools::showState()
+    {
+        if (g_config->enable_groupId_isolation_)
+        {
+            for (auto &bid_pool : bid2pool_map_)
+            {
+                showStateHelper(bid_pool.first, bid_pool.second);
+            }
+        }
+        else
+        {
+            showStateHelper("", pools_);
+        }
+    }
+
+    void TotalPools::showStateHelper(const std::string &inlong_group_id, 
std::vector<BufferPoolPtr> &pools)
+    {
+        for (auto &pool : pools)
+        {
+            pool->showState(inlong_group_id);
+        }
+    }
+
+    void TotalPools::close()
+    {
+        // waiting for sending the rest data
+        if (uid2buf_pool_.size())
+        {
+            LOG_INFO("waiting for 10s to ack remaining msg");
+            std::this_thread::sleep_for(std::chrono::seconds(10));
+        }
+        
+        for (auto &pool : pools_)
+        {
+            pool->close();
+        }
+
+        for (auto &bid_pool : bid2pool_map_)
+        {
+            for (auto &pool : bid_pool.second)
+            {
+                pool->close();
+            }
+        }
+    }
+
+} // namespace dataproxy_sdk
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h
new file mode 100644
index 000000000..849522c38
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/buffer_pool.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_BUFFER_POOL_H_
+#define DATAPROXY_SDK_NET_BUFFER_POOL_H_
+
+#include <deque>
+#include <mutex>
+#include <system_error>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include "atomic.h"
+#include "sdk_core.h"
+#include "noncopyable.h"
+#include "pack_queue.h"
+#include "recv_buffer.h"
+#include "executor_thread_pool.h"
+
+namespace dataproxy_sdk
+{
+  class SendBuffer;
+  class BufferPool : noncopyable, public 
std::enable_shared_from_this<BufferPool>
+  {
+  private:
+    uint32_t pool_id_;
+    uint32_t buf_num_; //sendbuf count
+    uint32_t read_, write_;
+    int32_t current_use_;//sendbuf being used count                            
               
+    std::vector<SendBuffer *> buffers_; // allocate memory                     
          
+    std::unordered_map<uint32_t, SendBuffer *> already_send_buf_list_; 
//key:uniq_id_, sent but not ack
+    mutable std::mutex pool_mutex_;
+    ExecutorThreadPtr executor_;
+
+    AtomicInt send_total_;   //send buf num, including resend
+    AtomicInt send_err_;     //fail send buf num, including resend
+    AtomicInt resend_total_; //total resent package
+    AtomicInt success_ack_;  //total ack msg
+
+  public:
+    // bufusage metrics
+    AtomicUInt waiting_ack_;
+    AtomicUInt has_ack_;
+    AtomicUInt has_send_buf_; //sent to conn
+
+  public:
+    BufferPool(uint32_t pool_id, uint32_t buf_num, uint32_t buf_size);
+    virtual ~BufferPool();
+
+    bool isAvaliable(); //is bufpool avaliable
+    virtual int32_t getSendBuf(SendBuffer *&send_buf);
+    int32_t sendBufToConn(SendBuffer *&send_buf);
+    
+    void ackBufHelper(uint32_t uinq_id); // ack
+    void ackBuf(uint32_t uniq_id) { 
executor_->postTask(std::bind(&BufferPool::ackBufHelper, shared_from_this(), 
uniq_id)); }
+
+    uint32_t poolId() const { return pool_id_; }
+
+    uint32_t writeId() const { return write_; }
+
+    int32_t currentUse() const { return current_use_; }
+
+    void showState(const std::string &inlong_group_id);
+
+    void close();
+
+  private:
+    void RetryHandler(const std::error_code &ec, SendBuffer *buf);
+  };
+
+  using BufferPoolPtr = std::shared_ptr<BufferPool>;
+
+  class TotalPools : noncopyable
+  {
+  private:
+    // round-robin
+    std::vector<BufferPoolPtr> pools_;
+    int32_t next_;
+
+    // bid_isolation
+    std::unordered_map<std::string, std::vector<BufferPoolPtr>> bid2pool_map_;
+    std::unordered_map<std::string, int32_t> bid2next_;
+
+    std::unordered_map<uint32_t, BufferPoolPtr> uid2buf_pool_;// <sent buf 
uid, bufpool>
+
+    mutable std::mutex mutex_;
+
+    void close(); // shutdown buffer thread
+    void showStateHelper(const std::string &inlong_group_id, 
std::vector<BufferPoolPtr> &pools);
+
+  public:
+    TotalPools();
+    virtual ~TotalPools() { close(); } // FIXME: need other operations?
+    bool isPoolAvailable(const std::string &inlong_group_id);
+    virtual BufferPoolPtr getPool(const std::string &inlong_group_id);
+    void addUid2BufPool(uint32_t uniqId, BufferPoolPtr& bpr);
+    BufferPoolPtr getUidBufPool(uint32_t uniqId);
+    void showState();
+  };
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_NET_BUFFER_POOL_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h
new file mode 100644
index 000000000..673c89186
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/net/send_buffer.h
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_NET_SEND_BUFFER_H_
+#define DATAPROXY_SDK_NET_SEND_BUFFER_H_
+
+#include <mutex>
+#include <string>
+
+#include "atomic.h"
+#include "sdk_core.h"
+// #include "executor_thread_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "user_msg.h"
+// #include "socket_connection.h"
+
+namespace dataproxy_sdk
+{
+class SendBuffer : noncopyable
+{
+  private:
+    uint32_t uniq_id_;                      
+    bool is_used_;                          
+    bool is_packed_;                        //is packed completed
+    char* content_;                         //send data
+    uint32_t size_;                         //buf_size
+    int32_t msg_cnt_;                       
+    uint32_t len_;                          //send data len
+    std::string inlong_group_id_;                      
+    std::string inlong_stream_id_;                      
+    AtomicInt already_send_;                
+    uint64_t first_send_time_;              //ms
+    uint64_t latest_send_time_;             //ms
+    ConnectionPtr target_;                  //send conn
+    std::vector<UserMsgPtr> user_msg_set_;
+
+  public:
+    std::mutex mutex_;
+    SteadyTimerPtr timeout_timer_;  //timeout, resend
+    AtomicInt fail_create_conn_;    //create conn fail count
+
+  public:
+    SendBuffer(uint32_t size)
+        : is_used_(false)
+        , is_packed_(false)
+        , mutex_()
+        , msg_cnt_(0)
+        , len_(0)
+        , inlong_group_id_()
+        , inlong_stream_id_()
+        , first_send_time_(0)
+        , latest_send_time_(0)
+        , target_(nullptr)
+        , uniq_id_(0)
+        , timeout_timer_(nullptr)
+        , size_(size)
+    {
+        content_ = new char[size];
+        if (content_)
+        {
+            memset(content_, 0x0, size);
+        }
+    }
+    ~SendBuffer()
+    {
+        if (content_) { delete[] content_; }
+        content_ = nullptr;
+    }
+
+    char* content() { return content_; }
+    int32_t msgCnt() const { return msg_cnt_; }
+    void setMsgCnt(const int32_t& msg_cnt) { msg_cnt_ = msg_cnt; }
+    uint32_t len() { return len_; }
+    void setLen(const uint32_t len) { len_ = len; }
+    std::string inlong_group_id() { return inlong_group_id_; }
+    std::string inlong_stream_id() { return inlong_stream_id_; }
+    void setBid(const std::string& inlong_group_id) { inlong_group_id_ = 
inlong_group_id; }
+    void setTid(const std::string& inlong_stream_id) { inlong_stream_id_ = 
inlong_stream_id; }
+    uint64_t firstSendTime() const { return first_send_time_; }
+    void setFirstSendTime(const uint64_t& first_send_time) { first_send_time_ 
= first_send_time; }
+    uint64_t latestSendTime() const { return latest_send_time_; }
+    void setLatestSendTime(const uint64_t& latest_send_time) { 
latest_send_time_ = latest_send_time; }
+
+    ConnectionPtr target() const { return target_; }
+    void setTarget(ConnectionPtr& target) { target_ = target; }
+
+    inline void increaseRetryNum() { already_send_.increment(); }
+    inline int32_t getAlreadySend() { return already_send_.get(); }
+
+    uint32_t uniqId() const { return uniq_id_; }
+    void setUniqId(const uint32_t& uniq_id) { uniq_id_ = uniq_id; }
+
+    void addUserMsg(UserMsgPtr u_msg) { user_msg_set_.push_back(u_msg); }
+    void doUserCallBack()
+    {
+        LOG_TRACE("failed to send msg, start user call_back");
+        for (auto it : user_msg_set_)
+        {
+            if (it->cb) { it->cb(inlong_group_id_.data(), 
inlong_stream_id_.data(), it->msg.data(), it->msg.size(), it->user_report_time, 
it->user_client_ip.data()); }
+        }
+    }
+
+    void reset()
+    {
+        uint32_t record_uid = uniq_id_;  // for debug
+
+        uniq_id_   = 0;
+        is_used_   = false;
+        is_packed_ = false;
+        memset(content_, 0x0, size_);
+        msg_cnt_ = 0;
+        len_     = 0;
+        inlong_group_id_     = "";
+        inlong_stream_id_     = "";
+        already_send_.getAndSet(0);
+        first_send_time_  = 0;
+        latest_send_time_ = 0;
+        target_           = nullptr;
+        if (timeout_timer_)
+        {
+            timeout_timer_->cancel();
+            timeout_timer_ = nullptr;
+        }
+        user_msg_set_.clear();
+        fail_create_conn_.getAndSet(0);
+        LOG_TRACE("reset senfbuf(uid:%d) successfully", record_uid);
+    }
+
+    bool isPacked() const { return is_packed_; }
+    void setIsPacked(bool is_packed) { is_packed_ = is_packed; }
+
+    bool isUsed() const { return is_used_; }
+    void setIsUsed(bool is_used) { is_used_ = is_used; }
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_NET_SEND_BUFFER_H_
\ No newline at end of file

Reply via email to