This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c33c8aab [INLONG-5249][SDK] DataProxy SDK(cpp) utils file (#5250)
3c33c8aab is described below

commit 3c33c8aaba05e8b83c750bcdf8cb249db950b3bb
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 17:40:14 2022 +0800

    [INLONG-5249][SDK] DataProxy SDK(cpp) utils file (#5250)
---
 .../dataproxy-sdk-cpp/src/base/atomic.h            | 101 +++++
 .../dataproxy-sdk-cpp/src/base/ini_help.cc         | 292 ++++++++++++++
 .../dataproxy-sdk-cpp/src/base/ini_help.h          |  94 +++++
 .../dataproxy-sdk-cpp/src/base/logger.cc           |  97 +++++
 .../dataproxy-sdk-cpp/src/base/logger.h            | 128 ++++++
 .../dataproxy-sdk-cpp/src/base/msg_protocol.h      |  90 +++++
 .../dataproxy-sdk-cpp/src/base/noncopyable.h       |  37 ++
 .../dataproxy-sdk-cpp/src/base/read_write_mutex.h  | 133 +++++++
 .../dataproxy-sdk-cpp/src/base/recv_buffer.h       | 419 ++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/base/sdk_constant.h      | 102 +++++
 .../dataproxy-sdk-cpp/src/base/singleton.h         |  63 +++
 .../dataproxy-sdk-cpp/src/base/utils.cc            | 435 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/base/utils.h             |  84 ++++
 13 files changed, 2075 insertions(+)

diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h
new file mode 100644
index 000000000..6babab6a0
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_ATOMIC_H_
+#define DATAPROXY_SDK_BASE_ATOMIC_H_
+
+#include <stdint.h>
+
+#include "noncopyable.h"
+namespace dataproxy_sdk {
+template <typename T>
+class AtomicIntegerT : noncopyable {
+private:
+    volatile T value_;
+
+public:
+    AtomicIntegerT() : value_(0) {}
+
+    explicit AtomicIntegerT(T value) : value_(value) {}
+
+    // if value_ equals oldval, update it as newval and return true;
+    // otherwise, return false
+    inline bool compareAndSwap(T oldval, T newval)
+    {
+        return __sync_bool_compare_and_swap(&value_, oldval, newval);
+    }
+
+    inline T get()
+    {
+        return __sync_val_compare_and_swap(&value_, 0, 0);
+    }
+
+    inline T getAndAdd(T x)
+    {
+        return __sync_fetch_and_add(&value_, x);
+    }
+
+    inline T getAndIncrease()
+    {
+        return __sync_fetch_and_add(&value_, 1);
+    }
+
+    inline T addAndGet(T x)
+    {
+        return getAndAdd(x) + x;
+    }
+
+    inline T incrementAndGet()
+    {
+        return addAndGet(1);
+    }
+
+    inline T decrementAndGet()
+    {
+        return addAndGet(-1);
+    }
+
+    inline void add(T x)
+    {
+        getAndAdd(x);
+    }
+
+    inline void increment()
+    {
+        incrementAndGet();
+    }
+
+    inline void decrement()
+    {
+        decrementAndGet();
+    }
+
+    inline T getAndSet(T newValue)
+    {
+        return __sync_lock_test_and_set(&value_, newValue);
+    }
+};
+
+using AtomicInt = AtomicIntegerT<int32_t>;
+
+using AtomicUInt = AtomicIntegerT<uint32_t>;
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_ATOMIC_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc
new file mode 100644
index 000000000..9959bafb7
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ini_help.h"
+
+#include <ctype.h>
+#include <fstream>
+#include <iostream>
+#include <stdio.h>
+#include <stdlib.h>
+
+namespace dataproxy_sdk
+{
+
+/**
+ * @description: parse line content
+ * @return {*} true if success
+ * @param {string&} content
+ * @param {string*} key
+ * @param {string*} value
+ */
+bool IniFile::parse(const std::string& content, std::string* key, std::string* 
value) { return split(content, "=", key, value); }
+
+int IniFile::updateSection(const std::string& cleanLine,
+                           const std::string& comment,
+                           const std::string& rightComment,
+                           IniSection** section)
+{
+    IniSection* newSection;
+    // find ']'
+    size_t index = cleanLine.find_first_of(']');
+    if (index == std::string::npos)
+    {
+        err_msg_ = std::string("no matched ] found");
+        return 1;
+    }
+
+    int len = index - 1;
+
+    if (len <= 0)
+    {
+        err_msg_ = std::string("section name is empty");
+        return 1;
+    }
+
+    // set section name
+    std::string s(cleanLine, 1, len);
+
+    trim(s);
+
+    //check section 
+    if (getSection(s) != NULL)
+    {
+        err_msg_ = std::string("section ") + s + std::string("already exist");
+        return 1;
+    }
+
+    newSection = new IniSection();
+    newSection->name = s;
+    newSection->comment      = comment;
+    newSection->rightComment = rightComment;
+
+    sections_.push_back(newSection);
+
+    *section = newSection;
+
+    return 0;
+}
+
+int IniFile::addKV(const std::string& cleanLine,
+                             const std::string& comment,
+                             const std::string& rightComment,
+                             IniSection* section)
+{
+    std::string key, value;
+
+    if (!parse(cleanLine, &key, &value))
+    {
+        err_msg_ = std::string("parse line failed:") + cleanLine;
+        return 1;
+    }
+
+    Iterm item;
+    item.key          = key;
+    item.value        = value;
+    item.comment      = comment;
+    item.rightComment = rightComment;
+
+    section->items.push_back(item);
+
+    return 0;
+}
+
+int IniFile::load(const std::string& fileName)
+{
+    int err;
+    std::string line;
+    std::string comment;
+    std::string rightComment;
+    IniSection* currSection = NULL;  //init a section
+
+    close();
+
+    ini_file_name_ = fileName;
+    std::ifstream istream(ini_file_name_);
+    if (!istream.is_open())
+    {
+        err_msg_ = std::string("open") + ini_file_name_ + std::string(" file 
failed");
+        return 1;
+    }
+
+    // add new section
+    currSection       = new IniSection();
+    currSection->name = "";
+    sections_.push_back(currSection);
+
+    // read line
+    while (std::getline(istream, line))
+    {
+        trim(line);
+
+        // skip empty
+        if (line.length() <= 0)
+        {
+            comment += delim;
+            continue;
+        }
+
+        // whether contains section or key
+        // find '['
+        if (line[0] == '[') { err = updateSection(line, comment, rightComment, 
&currSection); }
+        else
+        {
+            err = addKV(line, comment, rightComment, currSection);
+        }
+
+        if (err != 0)
+        {
+            istream.close();
+            return err;
+        }
+
+        // clear
+        comment      = "";
+        rightComment = "";
+    }
+
+    istream.close();
+
+    return 0;
+}
+
+IniSection* IniFile::getSection(const std::string& section)
+{
+    for (SectionIterator it = sections_.begin(); it != sections_.end(); ++it)
+    {
+        if ((*it)->name == section) { return *it; }
+    }
+
+    return NULL;
+}
+
+int IniFile::getString(const std::string& section, const std::string& key, 
std::string* value)
+{
+    return getValue(section, key, value);
+}
+
+int IniFile::getInt(const std::string& section, const std::string& key, int* 
intValue)
+{
+    int err;
+    std::string strValue;
+
+    err = getValue(section, key, &strValue);
+
+    *intValue = atoi(strValue.c_str());
+
+    return err;
+}
+
+int IniFile::getValue(const std::string& section, const std::string& key, 
std::string* value)
+{
+    std::string comment;
+    return getValue(section, key, value, &comment);
+}
+
+int IniFile::getValue(const std::string& section, const std::string& key, 
std::string* value, std::string* comment)
+{
+    IniSection* sect = getSection(section);
+
+    if (sect == NULL)
+    {
+        err_msg_ = std::string("not find the section ") + section;
+        return 1;
+    }
+
+    for (IniSection::ItermIterator it = sect->begin(); it != sect->end(); ++it)
+    {
+        if (it->key == key)
+        {
+            *value   = it->value;
+            *comment = it->comment;
+            return 0;
+        }
+    }
+
+    err_msg_ = std::string("not find the key ") + key;
+    return 1;
+}
+
+void IniFile::close()
+{
+    ini_file_name_ = "";
+
+    for (SectionIterator it = sections_.begin(); it != sections_.end(); ++it)
+    {
+        delete (*it);  // clear section
+    }
+
+    sections_.clear();
+}
+
+void IniFile::trim(std::string& str)
+{
+    int len = str.length();
+
+    int i = 0;
+
+    while ((i < len) && isspace(str[i]) && (str[i] != '\0'))
+    {
+        i++;
+    }
+
+    if (i != 0) { str = std::string(str, i, len - i); }
+
+    len = str.length();
+
+    for (i = len - 1; i >= 0; --i)
+    {
+        if (!isspace(str[i])) { break; }
+    }
+
+    str = std::string(str, 0, i + 1);
+}
+
+bool IniFile::split(const std::string& str, const std::string& sep, 
std::string* pleft, std::string* pright)
+{
+    size_t pos = str.find(sep);
+    std::string left, right;
+
+    if (pos != std::string::npos)
+    {
+        left  = std::string(str, 0, pos);
+        right = std::string(str, pos + 1);
+
+        trim(left);
+        trim(right);
+
+        *pleft  = left;
+        *pright = right;
+        return true;
+    }
+    else
+    {
+        left  = str;
+        right = "";
+
+        trim(left);
+
+        *pleft  = left;
+        *pright = right;
+        return false;
+    }
+}
+
+
+}  // namespace dataproxy_sdk
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h
new file mode 100644
index 000000000..235752652
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_INI_HELP_H_
+#define DATAPROXY_SDK_BASE_INI_HELP_H_
+
+#include <algorithm>
+#include <string.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+const char delim[] = "\n";
+struct Iterm
+{
+    std::string key;
+    std::string value;
+    std::string comment;
+    std::string rightComment;
+};
+
+struct IniSection
+{
+    using ItermIterator = std::vector<Iterm>::iterator; 
+    ItermIterator begin()
+    {
+        return items.begin();
+    }
+
+    ItermIterator end()
+    {
+        return items.end(); 
+    }
+
+    std::string name;
+    std::string comment;
+    std::string rightComment;
+    std::vector<Iterm> items;
+};
+
+class IniFile
+{
+  public:
+    IniFile(){}
+    ~IniFile() { close(); }
+
+    int load(const std::string& fileName);
+    int getString(const std::string& section, const std::string& key, 
std::string* value);
+    int getInt(const std::string& section, const std::string& key, int* value);
+
+  private:
+    IniSection* getSection(const std::string& section = "");
+    static void trim(std::string& str);
+    int updateSection(const std::string& cleanLine,
+                      const std::string& comment,
+                      const std::string& rightComment,
+                      IniSection** section);
+    int addKV(const std::string& cleanLine,
+                        const std::string& comment,
+                        const std::string& rightComment,
+                        IniSection* section);
+    void close();
+    bool split(const std::string& str, const std::string& sep, std::string* 
left, std::string* right);
+    bool parse(const std::string& content, std::string* key, std::string* 
value);
+    int getValue(const std::string& section, const std::string& key, 
std::string* value);
+    int getValue(const std::string& section, const std::string& key, 
std::string* value, std::string* comment);
+
+  private:
+    using SectionIterator = std::vector<IniSection*>::iterator;
+    std::vector<IniSection*> sections_;
+    std::string ini_file_name_;
+    std::string err_msg_;  // save err msg
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_INI_HELP_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
new file mode 100644
index 000000000..a4fa43e6b
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "logger.h"
+
+#include <log4cplus/consoleappender.h>
+#include <log4cplus/fileappender.h>
+#include <log4cplus/layout.h>
+#include <log4cplus/logger.h>
+#include <log4cplus/loggingmacros.h>
+#include <stdarg.h>
+
+#include <string>
+
+#include "singleton.h"
+
+namespace dataproxy_sdk
+{
+Logger& getLogger() { return Singleton<Logger>::instance(); }
+
+bool Logger::init(uint32_t file_max_size,
+                  uint32_t file_num,
+                  uint8_t level,
+                  uint8_t output_type,
+                  bool enable_limit,
+                  const std::string& base_path,
+                  const std::string& logname)
+{
+    file_max_size_ = file_max_size;
+    file_num_      = file_num;
+    level_         = level;
+    output_type_   = output_type;
+    enable_limit_  = enable_limit;
+    base_path_     = base_path;
+    log_name_      = logname;
+    setUp();
+    return true;
+}
+
+bool Logger::write(const char* format, ...)
+{
+    char buf[8192];
+    va_list ap;
+    va_start(ap, format);
+    vsnprintf(buf, sizeof(buf) - 1, format, ap);
+    va_end(ap);
+    return writeCharStream(buf);
+}
+
+bool Logger::writeCharStream(const char* log)
+{
+    auto logger = log4cplus::Logger::getInstance(instance_);
+    logger.forcedLog(log4cplus::TRACE_LOG_LEVEL, log);
+    return true;
+}
+
+void Logger::setUp()
+{
+    bool immediate_flush = true;
+    std::string pattern = "[%D{%Y-%m-%d %H:%M:%S.%q}]%m%n";
+    auto logger_d       = log4cplus::Logger::getInstance(instance_);
+    logger_d.removeAllAppenders();
+    logger_d.setLogLevel(log4cplus::TRACE_LOG_LEVEL);
+
+    if (output_type_ == 2)
+    {  //file
+        log4cplus::SharedAppenderPtr fileAppender(new 
log4cplus::RollingFileAppender(
+            base_path_ + log_name_, file_max_size_ * kMBSize, file_num_, 
immediate_flush, true));
+        std::unique_ptr<log4cplus::Layout> layout(new 
log4cplus::PatternLayout(pattern));
+        fileAppender->setLayout(std::move(layout));
+        logger_d.addAppender(fileAppender);
+    }
+    else
+    {  //console
+        log4cplus::SharedAppenderPtr consoleAppender(new 
log4cplus::ConsoleAppender());
+        consoleAppender->setLayout(std::unique_ptr<log4cplus::Layout>(new 
log4cplus::SimpleLayout()));
+        logger_d.addAppender(consoleAppender);
+    }
+}
+
+}  // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
new file mode 100644
index 000000000..e7582f25f
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_LOGGER_H_
+#define DATAPROXY_SDK_BASE_LOGGER_H_
+
+#include <stdint.h>
+#include <string.h>
+#include <string>
+#include <unistd.h>
+#include <vector>
+
+#include "sdk_constant.h"
+
+namespace dataproxy_sdk
+{
+static const uint32_t kMBSize = 1024 * 1024;
+const uint32_t kPid           = getpid();
+
+class Logger;
+
+Logger& getLogger();
+
+// only show fileName
+#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : 
__FILE__)
+
+#define LOG_LEVEL(level, fmt, ...)                                             
                                                  \
+    {                                                                          
                                                  \
+        if (dataproxy_sdk::getLogger().enableLevel(level))                     
                                                      \
+        {                                                                      
                                                  \
+            dataproxy_sdk::getLogger().write("[pid:%d][%s][%s:%s:%d]" fmt, 
kPid, dataproxy_sdk::Logger::level2String(level),             \
+                                         __FILENAME__, __func__, __LINE__, 
##__VA_ARGS__);                                       \
+        }                                                                      
                                                  \
+    }
+
+#define LOG_TRACE(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(), 
dataproxy_sdk::Logger::kLogTrace, fmt, ##__VA_ARGS__)
+#define LOG_DEBUG(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(), 
dataproxy_sdk::Logger::kLogDebug, fmt, ##__VA_ARGS__)
+#define LOG_INFO(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(), 
dataproxy_sdk::Logger::kLogInfo, fmt, ##__VA_ARGS__)
+#define LOG_WARN(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(), 
dataproxy_sdk::Logger::kLogWarn, fmt, ##__VA_ARGS__)
+#define LOG_ERROR(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(), 
dataproxy_sdk::Logger::kLogError, fmt, ##__VA_ARGS__)
+
+#define LOG_TDBUSCAPI(logger, level, fmt, ...)                                 
                                                  \
+    {                                                                          
                                                  \
+        if (logger.enableLevel(level))                                         
                                                  \
+        {                                                                      
                                                  \
+            logger.write("[pid:%d][%s][%s:%s:%d]" fmt, kPid, 
dataproxy_sdk::Logger::level2String(level), __FILENAME__, __func__,     \
+                         __LINE__, ##__VA_ARGS__);                             
                                                  \
+        }                                                                      
                                                  \
+    }
+
+class Logger
+{
+  public:
+    enum Level {
+        kLogError = 0,
+        kLogWarn  = 1,
+        kLogInfo  = 2,
+        kLogDebug = 3,
+        kLogTrace = 4,
+    };
+
+  private:
+    uint32_t file_max_size_;
+    uint32_t file_num_;
+    uint8_t level_;
+    uint8_t output_type_;  //2->file, 1->console
+    bool enable_limit_;
+    std::string base_path_;
+    std::string log_name_;
+
+    std::string instance_;
+
+  public:
+    Logger()
+        : file_max_size_(10)
+        , file_num_(10)
+        , level_(kLogInfo)
+        , output_type_(2)
+        , enable_limit_(true)
+        , base_path_("./logs/")
+        , instance_("DataProxySDK")
+    {
+    }
+
+    ~Logger() {}
+
+    bool init(uint32_t file_max_size,
+              uint32_t file_num,
+              uint8_t level,
+              uint8_t output_type,
+              bool enable_limit,
+              const std::string& base_path,
+              const std::string& logname = constants::kLogName);
+    bool write(const char* sFormat, ...) __attribute__((format(printf, 2, 3)));
+    inline bool writeStream(const std::string& msg) { return 
writeCharStream(msg.c_str()); }
+    inline bool enableLevel(Level level) { return ((level <= level_) ? true : 
false); }
+    static const char* level2String(Level level)
+    {
+        static const char* level_names[] = {
+            "ERROR", "WARN", "INFO", "DEBUG", "TRACE",
+        };
+        return level_names[level];
+    }
+
+  private:
+    void setUp();
+    bool writeCharStream(const char* msg);
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // CPAI_BASE_LOGGER_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h
new file mode 100644
index 000000000..844c9a6c8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
+#define DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
+
+#include "send_buffer.h"
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+// msg_type 1~6
+// totalLen(4)|msgtype(1)|bodyLen(4)|body(x)|attrLen(4)|attr(attr_len)
+#pragma pack(1)
+struct ProtocolMsgHead
+{
+    uint32_t total_len;
+    char msg_type;
+};
+struct ProtocolMsgBody
+{
+    uint32_t body_len;
+    char body[0];
+};
+struct ProtocolMsgTail
+{
+    uint32_t attr_len;
+    char attr[0];
+};
+// msg_type=7
+// 
totalLen(4)|msgtype(1)|bid_num(2)|tid_num(2)|ext_field(2)|data_time(4)|cnt(2)|uniq(4)|bodyLen(4)|body(x)|attrLen(4)|attr(attr_len)|magic(2)
+struct BinaryMsgHead
+{
+    uint32_t total_len;
+    char msg_type;
+    uint16_t bid_num;
+    uint16_t tid_num;
+    uint16_t ext_field;
+    uint32_t data_time;  //second, last pack time
+    uint16_t cnt;
+    uint32_t uniq;
+};
+// msg_type7 ack pack
+struct BinaryMsgAck
+{
+    uint32_t total_len;
+    char msg_type;
+    uint32_t uniq;  //buffer id
+    uint16_t attr_len;
+    char attr[0];
+    uint16_t magic;
+};
+
+// binary hb and hb ack, msg_type=8; hb ack: body_ver is 1, if body_len is 2, 
there is load in body
+struct BinaryHB
+{
+    uint32_t total_len;
+    char msg_type;
+    uint32_t data_time;  // second
+    uint8_t body_ver;    // body_ver=1
+    uint32_t body_len;   // body_len=0
+    char body[0];
+    uint16_t attr_len;  // attr_len=0;
+    char attr[0];
+    uint16_t magic;
+};
+#pragma pack()
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h
new file mode 100644
index 000000000..e3569f1ee
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_NONCOPYABLE_H_
+#define DATAPROXY_SDK_BASE_NONCOPYABLE_H_
+
+namespace dataproxy_sdk
+{
+class noncopyable
+{
+  public:
+    noncopyable(const noncopyable&) = delete;
+    void operator=(const noncopyable&) = delete;
+
+  protected:
+    noncopyable()  = default;
+    ~noncopyable() = default;
+};
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_NONCOPYABLE_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h
new file mode 100644
index 000000000..d23128229
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
+#define DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
+
+#include <condition_variable>
+#include <mutex>
+
+namespace dataproxy_sdk
+{
+// wirte operation add lock:unique_read_lock<read_write_mutex> lock( rwmutex );
+// read operation add lock:unique_write_lock<read_write_mutex> lock(rwmutex);
+
+class read_write_mutex
+{
+  public:
+    read_write_mutex()  = default;
+    ~read_write_mutex() = default;
+
+    read_write_mutex(const read_write_mutex&) = delete;
+    read_write_mutex& operator=(const read_write_mutex&) = delete;
+
+    read_write_mutex(const read_write_mutex&&) = delete;
+    read_write_mutex& operator=(const read_write_mutex&&) = delete;
+
+    void lock_read()
+    {
+        std::unique_lock<std::mutex> lock(m_mutex);
+        m_cond_read.wait(lock, [this]() -> bool { return m_write_count == 0; 
});
+        ++m_read_count;
+    }
+
+    void unlock_read()
+    {
+        std::unique_lock<std::mutex> lock(m_mutex);
+        if (--m_read_count == 0 && m_write_count > 0) { 
m_cond_write.notify_one(); }
+    }
+
+    void lock_write()
+    {
+        std::unique_lock<std::mutex> lock(m_mutex);
+        ++m_write_count;
+        m_cond_write.wait(lock, [this]() -> bool { return m_read_count == 0 && 
!m_writing; });
+        m_writing = true;
+    }
+
+    void unlock_write()
+    {
+        std::unique_lock<std::mutex> lock(m_mutex);
+        if (--m_write_count == 0) { m_cond_read.notify_all(); }
+        else
+        {
+            m_cond_write.notify_one();
+        }
+        m_writing = false;
+    }
+
+  private:
+    volatile size_t m_read_count  = 0;
+    volatile size_t m_write_count = 0;
+    volatile bool m_writing       = false;
+    mutable std::mutex m_mutex;  // KEYPOINT: add mutable
+    std::condition_variable m_cond_read;
+    std::condition_variable m_cond_write;
+};
+
+template <typename _ReadWriteLock>
+class unique_read_lock
+{
+  public:
+    explicit unique_read_lock(_ReadWriteLock& rwLock) : m_ptr_rw_lock(&rwLock) 
{ m_ptr_rw_lock->lock_read(); }
+
+    ~unique_read_lock()
+    {
+        if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_read(); }
+    }
+
+    unique_read_lock()                        = delete;
+    unique_read_lock(const unique_read_lock&) = delete;
+    unique_read_lock& operator=(const unique_read_lock&) = delete;
+    unique_read_lock(const unique_read_lock&&)           = delete;
+    unique_read_lock& operator=(const unique_read_lock&&) = delete;
+
+  private:
+    _ReadWriteLock* m_ptr_rw_lock = nullptr;
+};
+
+template <typename _ReadWriteLock>
+class unique_write_lock
+{
+  public:
+    explicit unique_write_lock(_ReadWriteLock& rwLock) : 
m_ptr_rw_lock(&rwLock) { m_ptr_rw_lock->lock_write(); }
+
+    ~unique_write_lock()
+    {
+        if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_write(); }
+    }
+
+    unique_write_lock()                         = delete;
+    unique_write_lock(const unique_write_lock&) = delete;
+    unique_write_lock& operator=(const unique_write_lock&) = delete;
+    unique_write_lock(const unique_write_lock&&)           = delete;
+    unique_write_lock& operator=(const unique_write_lock&&) = delete;
+
+    void unlock()
+    {
+        if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_write(); }
+    }
+
+  private:
+    _ReadWriteLock* m_ptr_rw_lock = nullptr;
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h
new file mode 100644
index 000000000..8e9977679
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_RECV_BUFFER_H_
+#define DATAPROXY_SDK_BASE_RECV_BUFFER_H_
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <algorithm>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "logger.h"
+
+namespace dataproxy_sdk
+{
+class RecvBuffer;
+using RecvBufferPtr = std::shared_ptr<RecvBuffer>;
+
+//store ack data from dataproxy
+class RecvBuffer
+{
+  public:
+    char* buffer_;
+    size_t capacity_;
+    size_t read_index_;
+    size_t write_index_;
+    size_t reserved_prepend_size_;
+    bool has_mem_{true};
+
+  public:
+    enum {
+        kCheapPrependSize = 0,
+        kInitialSize      = 1024 * 1024,
+    };
+
+    explicit RecvBuffer(size_t initial_size = kInitialSize, size_t 
reserved_prepend_size = kCheapPrependSize)
+        : capacity_(reserved_prepend_size + initial_size)
+        , read_index_(reserved_prepend_size)
+        , write_index_(reserved_prepend_size)
+        , reserved_prepend_size_(reserved_prepend_size)
+    {
+        buffer_ = new char[capacity_];
+        has_mem_ = true;
+        assert(length() == 0);
+        assert(WritableBytes() == initial_size);
+        assert(PrependableBytes() == reserved_prepend_size);
+    }
+
+    ~RecvBuffer()
+    {
+        if (has_mem_ && buffer_)
+        {
+            delete[] buffer_;
+        }
+        buffer_   = nullptr;
+        capacity_ = 0;
+    }
+
+    std::string String()
+    {
+        char buf[1024];
+        snprintf(buf, sizeof(buf), 
"buffer:%p,capacity:%ld,readindex:%ld,writeindex:%ld,prependsize:%ld,hasmem:%d",
 buffer_,
+                 capacity_, read_index_, write_index_, reserved_prepend_size_, 
has_mem_);
+        return buf;
+    }
+
+    RecvBufferPtr Slice()
+    {
+        auto buff      = std::make_shared<RecvBuffer>(*this);
+        buff->has_mem_ = false;
+        return buff;
+    }
+
+    void Swap(RecvBuffer& rhs)
+    {
+        std::swap(buffer_, rhs.buffer_);
+        std::swap(capacity_, rhs.capacity_);
+        std::swap(read_index_, rhs.read_index_);
+        std::swap(write_index_, rhs.write_index_);
+        std::swap(reserved_prepend_size_, rhs.reserved_prepend_size_);
+    }
+
+    // Skip advances the reading index of the buffer
+    void Skip(size_t len)
+    {
+        if (len < length()) { read_index_ += len; }
+        else
+        {
+            Reset();
+        }
+    }
+
+    // Retrieve advances the reading index of the buffer
+    // Retrieve it the same as Skip.
+    void Retrieve(size_t len) { Skip(len); }
+
+    // Truncate discards all but the first n unread bytes from the buffer
+    // but continues to use the same allocated storage.
+    // It does nothing if n is greater than the length of the buffer.
+    void Truncate(size_t n)
+    {
+        if (n == 0)
+        {
+            read_index_  = reserved_prepend_size_;
+            write_index_ = reserved_prepend_size_;
+        }
+        else if (write_index_ > read_index_ + n)
+        {
+            write_index_ = read_index_ + n;
+        }
+    }
+
+    // Reset resets the buffer to be empty,
+    // but it retains the underlying storage for use by future writes.
+    // Reset is the same as Truncate(0).
+    void Reset() { Truncate(0); }
+
+    // Increase the capacity of the container to a value that's greater
+    // or equal to len. If len is greater than the current capacity(),
+    // new storage is allocated, otherwise the method does nothing.
+    void Reserve(size_t len)
+    {
+        if (capacity_ >= len + reserved_prepend_size_) { return; }
+
+        grow(len + reserved_prepend_size_);
+    }
+
+    // Make sure there is enough memory space to append more data with length 
len
+    void EnsureWritableBytes(size_t len)
+    {
+        if (WritableBytes() < len) { grow(len); }
+
+        assert(WritableBytes() >= len);
+    }
+
+    // ToText appends char '\0' to buffer to convert the underlying data to a 
c-style string text.
+    // It will not change the length of buffer.
+    void ToText()
+    {
+        AppendInt8('\0');
+        UnwriteBytes(1);
+    }
+
+    // Write
+  public:
+    void Write(const void* /*restrict*/ d, size_t len)
+    {
+        EnsureWritableBytes(len);
+        memcpy(WriteBegin(), d, len);
+        assert(write_index_ + len <= capacity_);
+        write_index_ += len;
+    }
+
+    void Append(const char* /*restrict*/ d, size_t len) { Write(d, len); }
+
+    void Append(const void* /*restrict*/ d, size_t len) { Write(d, len); }
+
+    void AppendInt32(int32_t x)
+    {
+        int32_t be32 = htonl(x);
+        Write(&be32, sizeof be32);
+    }
+
+    void AppendInt16(int16_t x)
+    {
+        int16_t be16 = htons(x);
+        Write(&be16, sizeof be16);
+    }
+
+    void AppendInt8(int8_t x) { Write(&x, sizeof x); }
+
+    void PrependInt32(int32_t x)
+    {
+        int32_t be32 = htonl(x);
+        Prepend(&be32, sizeof be32);
+    }
+
+    void PrependInt16(int16_t x)
+    {
+        int16_t be16 = htons(x);
+        Prepend(&be16, sizeof be16);
+    }
+
+    void PrependInt8(int8_t x) { Prepend(&x, sizeof x); }
+
+    // Insert content, specified by the parameter, into the front of reading 
index
+    void Prepend(const void* /*restrict*/ d, size_t len)
+    {
+        assert(len <= PrependableBytes());
+        read_index_ -= len;
+        const char* p = static_cast<const char*>(d);
+        memcpy(begin() + read_index_, p, len);
+    }
+
+    void UnwriteBytes(size_t n)
+    {
+        assert(n <= length());
+        write_index_ -= n;
+    }
+
+    void WriteBytes(size_t n)
+    {
+        assert(n <= WritableBytes());
+        write_index_ += n;
+    }
+
+    // Read
+  public:
+    // Peek int32_t/int16_t/int8_t with network endian
+
+    uint32_t ReadUint32()
+    {
+        uint32_t result = PeekUint32();
+        Skip(sizeof result);
+        return result;
+    }
+
+    int32_t ReadInt32()
+    {
+        int32_t result = PeekInt32();
+        Skip(sizeof result);
+        return result;
+    }
+
+    int16_t ReadInt16()
+    {
+        int16_t result = PeekInt16();
+        Skip(sizeof result);
+        return result;
+    }
+
+    uint16_t ReadUint16()
+    {
+        uint16_t result = PeekUint16();
+        Skip(sizeof result);
+        return result;
+    }
+
+    int8_t ReadInt8()
+    {
+        int8_t result = PeekInt8();
+        Skip(sizeof result);
+        return result;
+    }
+
+    uint8_t ReadUint8()
+    {
+        uint8_t result = PeekUint8();
+        Skip(sizeof result);
+        return result;
+    }
+
+    std::string ToString() const { return std::string(data(), length()); }
+
+    // ReadByte reads and returns the next byte from the buffer.
+    // If no byte is available, it returns '\0'.
+    char ReadByte()
+    {
+        assert(length() >= 1);
+
+        if (length() == 0) { return '\0'; }
+
+        return buffer_[read_index_++];
+    }
+
+    // UnreadBytes unreads the last n bytes returned
+    // by the most recent read operation.
+    void UnreadBytes(size_t n)
+    {
+        assert(n < read_index_);
+        read_index_ -= n;
+    }
+
+    // Peek
+  public:
+    // Peek int64_t/int32_t/int16_t/int8_t with network endian
+
+    uint32_t PeekUint32() const
+    {
+        // LOG_DEBUG("recv_buf->length() is%d", length());
+        assert(length() >= sizeof(uint32_t));
+        uint32_t be32 = 0;
+        ::memcpy(&be32, data(), sizeof be32);
+        return ntohl(be32);
+        // LOG_DEBUG
+    }
+
+    int32_t PeekInt32() const
+    {
+        assert(length() >= sizeof(int32_t));
+        int32_t be32 = 0;
+        ::memcpy(&be32, data(), sizeof be32);
+        return ntohl(be32);
+    }
+
+    int16_t PeekInt16() const
+    {
+        assert(length() >= sizeof(int16_t));
+        int16_t be16 = 0;
+        ::memcpy(&be16, data(), sizeof be16);
+        return ntohs(be16);
+    }
+
+    uint16_t PeekUint16() const
+    {
+        assert(length() >= sizeof(uint16_t));
+        uint16_t be16 = 0;
+        ::memcpy(&be16, data(), sizeof be16);
+        return ntohs(be16);
+    }
+
+    int8_t PeekInt8() const
+    {
+        assert(length() >= sizeof(int8_t));
+        int8_t x = *data();
+        return x;
+    }
+
+    uint8_t PeekUint8() const
+    {
+        assert(length() >= sizeof(uint8_t));
+        uint8_t x = *data();
+        return x;
+    }
+
+  public:
+    // data returns a pointer of length Buffer.length() holding the unread 
portion of the buffer.
+    // The data is valid for use only until the next buffer modification (that 
is,
+    // only until the next call to a method like Read, Write, Reset, or 
Truncate).
+    // The data aliases the buffer content at least until the next buffer 
modification,
+    // so immediate changes to the slice will affect the result of future 
reads.
+    const char* data() const { return buffer_ + read_index_; }
+
+    char* WriteBegin() { return begin() + write_index_; }
+
+    const char* WriteBegin() const { return begin() + write_index_; }
+
+    // length returns the number of bytes of the unread portion of the buffer
+    size_t length() const
+    {
+        assert(write_index_ >= read_index_);
+        return write_index_ - read_index_;
+    }
+
+    // size returns the number of bytes of the unread portion of the buffer.
+    // It is the same as length().
+    size_t size() const { return length(); }
+
+    // capacity returns the capacity of the buffer's underlying byte slice, 
that is, the
+    // total space allocated for the buffer's data.
+    size_t capacity() const { return capacity_; }
+
+    size_t WritableBytes() const
+    {
+        assert(capacity_ >= write_index_);
+        return capacity_ - write_index_;
+    }
+
+    size_t PrependableBytes() const { return read_index_; }
+
+  public:
+    char* begin() { return buffer_; }
+
+    const char* begin() const { return buffer_; }
+
+    void grow(size_t len)
+    {
+        if (!has_mem_) { return; }
+        if (WritableBytes() + PrependableBytes() < len + 
reserved_prepend_size_)
+        {
+            // grow the capacity
+            size_t n = (capacity_ << 1) + len;
+            size_t m = length();
+            char* d  = new char[n];
+            memcpy(d + reserved_prepend_size_, begin() + read_index_, m);
+            write_index_ = m + reserved_prepend_size_;
+            read_index_  = reserved_prepend_size_;
+            capacity_    = n;
+            delete[] buffer_;
+            buffer_ = d;
+        }
+        else
+        {
+            // move readable data to the front, make space inside buffer
+            assert(reserved_prepend_size_ < read_index_);
+            size_t readable = length();
+            memmove(begin() + reserved_prepend_size_, begin() + read_index_, 
length());
+            read_index_  = reserved_prepend_size_;
+            write_index_ = read_index_ + readable;
+            assert(readable == length());
+            assert(WritableBytes() >= len);
+        }
+    }
+};
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_RECV_BUFFER_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
new file mode 100644
index 000000000..ef2c9514a
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CONSTANT_H_
+#define DATAPROXY_SDK_BASE_CONSTANT_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace dataproxy_sdk
+{
+    namespace constants
+    {
+        // load weight
+        static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 
3, 3, 3, 3, 6, 6, 6, 6, 6, 12, 12, 12, 12, 12, 48, 96, 192, 384, 1000};
+
+        static const int32_t kPrime[6] = {1, 3, 5, 7, 11, 13};
+        static const int32_t kPrimeSize = 6;
+
+        static const int32_t kMaxRequestTDMTimes = 4;
+        static const int32_t kMaxRetryConnection = 20;                         
                                  //create conn failed more than 20 times, 
start sendbuf callback
+        static const std::string kAttrFormat = 
"__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx"; // msg_type 7 
body's attr format
+        static const int32_t kAttrLen = kAttrFormat.size();
+
+        static const char kTDBusCAPIVersion[] = "dataproxy_sdk_cpp-v2.0.0";
+        static const char kLogName[] = "dataproxy_cpp.log";
+        static const uint16_t kBinaryMagic = 0xEE01;
+        static const uint32_t kBinPackMethod = 7;     // msg>=7
+        static const uint8_t kBinSnappyFlag = 1 << 5; // snappy flag
+        static const int32_t kBackupBusNum = 4;       // backup_proxy_num
+
+        static const int32_t kThreadNums = 10; 
+        static const int32_t kSharedBufferNums=5;
+        static const bool kEnableBidIsolation = false; 
+        static const int32_t kBufferNumPerBid = 5; 
+        static const bool kEnablePack = true;
+        static const uint32_t kPackSize = 4096;    
+        static const uint32_t kPackTimeout = 3000; 
+        static const uint32_t kExtPackSize = 16384; 
+        static const bool kEnableZip = true;
+        static const uint32_t kMinZipLen = 512; 
+
+        static const bool kEnableRetry = true;
+        static const uint32_t kRetryInterval = 3000; 
+        static const uint32_t kRetryNum = 3;         
+        static const uint32_t kLogNum = 10;
+        static const uint32_t kLogSize = 10;   
+        static const uint8_t kLogLevel = 1;    
+        static const uint8_t kLogFileType = 2;
+        static const std::string kLogPath = "./logs/";
+        static const bool kLogEnableLimit = true;
+
+        static const std::string kProxyURL = 
"http://127.0.0.1:8099/api/dataproxy_ip_v2";;
+        static const bool kEnableProxyURLFromCluster = false;
+        static const std::string kBusClusterURL =
+            
"http://127.0.0.1:8099/heartbeat/dataproxy_ip_v2?cluster_id=0&net_tag=normal";;
+        static const uint32_t kProxyUpdateInterval = 10;
+        static const uint32_t kProxyURLTimeout = 2;
+        static const uint32_t kMaxActiveProxyNum = 3;
+
+        static const std::string kSerIP = "127.0.0.1"; 
+        static const uint32_t kMaxBufPool = 50 * 1024 * 1024;
+        static const uint32_t kMsgType = 7;
+
+        static const bool kEnableTCPNagle = true;
+        static const bool kEnableHeartBeat = true;
+        static const uint32_t kHeartBeatInterval = 60; 
+        static const bool kEnableSetAffinity = false;
+        static const uint32_t kMaskCPUAffinity = 0xff;
+        static const bool kIsFromDC = false;
+        static const uint16_t kExtendField = 0;
+        static const std::string kNetTag = "all";
+
+        static const bool kNeedAuth = false;
+
+        // http basic auth  
+        static const std::string kBasicAuthHeader = "Authorization:";
+        static const std::string kBasicAuthPrefix = "Basic";
+        static const std::string kBasicAuthSeparator = " ";
+        static const std::string kBasicAuthJoiner = ":";
+
+    } // namespace constants
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_CONSTANT_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h
new file mode 100644
index 000000000..6e927de87
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_SINGLETON_H_
+#define DATAPROXY_SDK_BASE_SINGLETON_H_
+
+#include <assert.h>
+#include <mutex>
+#include <thread>
+
+#include "noncopyable.h"
+
+namespace dataproxy_sdk {
+template <typename T>
+class Singleton : noncopyable {
+private:
+    static std::once_flag once_;
+    static T*             value_;
+
+public:
+    Singleton() = delete;
+    ~Singleton() = delete;
+
+    static T& instance()
+    {
+        std::call_once(once_, Singleton::init);
+        assert(value_ != nullptr);
+
+        return *value_;
+    }
+
+private:
+    static void init()
+    {
+        value_ = new T();
+    }
+};
+
+template <typename T>
+std::once_flag Singleton<T>::once_;
+
+template <typename T>
+T* Singleton<T>::value_ = nullptr;
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_SINGLETON_H_
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
new file mode 100644
index 000000000..0a832b914
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "utils.h"
+
+#include <arpa/inet.h>
+#include <ctime>
+#include <curl/curl.h>
+#include <errno.h>
+#include <fstream>
+#include <iostream>
+#include <iterator>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <regex>
+#include <sstream>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/sysinfo.h>
+#include <sys/time.h>
+
+#include "logger.h"
+#include "tc_api.h"
+namespace dataproxy_sdk
+{
+uint16_t Utils::sequence     = 0;
+uint64_t Utils::last_msstamp = 0;
+char Utils::snowflake_id[35] = {0};
+char base64_table[] = {
+      'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
+      'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+      'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
+      'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
+      'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
+      'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
+      'w', 'x', 'y', 'z', '0', '1', '2', '3',
+      '4', '5', '6', '7', '8', '9', '+', '/'
+    };
+
+void Utils::taskWaitTime(int32_t sec)
+{
+    struct timeval tv;
+    tv.tv_sec  = sec;
+    tv.tv_usec = 0;
+    int err;
+    do
+    {
+        err = select(0, NULL, NULL, NULL, &tv);
+    } while (err < 0 && errno == EINTR);
+}
+
+uint64_t Utils::getCurrentMsTime()
+{
+    uint64_t ms_time = 0;
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    ms_time = ((uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000);
+    return ms_time;
+}
+uint64_t Utils::getCurrentWsTime()
+{
+    uint64_t ws_time = 0;
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    ws_time = ((uint64_t)tv.tv_sec * 1000000 + tv.tv_usec);
+    return ws_time;
+}
+
+std::string Utils::getFormatTime(uint64_t data_time)
+{
+    struct tm timeinfo;
+    char buffer[80];
+
+    // time(&rawtime);
+    time_t m_time = data_time / 1000;
+    localtime_r(&m_time, &timeinfo);
+
+    strftime(buffer, sizeof(buffer), "%Y%m%d%H%M%S", &timeinfo);
+
+    return std::string(buffer);
+}
+
+size_t Utils::zipData(const char* input, uint32_t input_len, std::string& 
zip_res)
+{
+    size_t len_after_zip = snappy::Compress((char*)input, input_len, &zip_res);
+    LOG_TRACE("data zip: input len is %u, output len is %u.", input_len, 
len_after_zip);
+    return len_after_zip;
+}
+
+char* Utils::getSnowflakeId()
+{
+    std::string local_host;
+    getFirstIpAddr(local_host);
+    uint64_t ipaddr = htonl(inet_addr(local_host.c_str()));
+    uint32_t pidid  = static_cast<uint16_t>((getpid() & 0xFFFF));
+    uint32_t selfid = static_cast<uint16_t>((pthread_self() & 0xFFFF00) >> 8);
+
+    // 22bit ms
+    uint64_t sequence_mask = -1LL ^ (-1LL << 22);
+
+    uint64_t time_id  = 0LL;
+    uint64_t local_id = (ipaddr << 32) | (pidid << 16) | (selfid);
+
+    uint64_t since_date = 1288834974657LL;  // Thu, 04 Nov 2010 01:42:54 GMT
+
+    // 41bit ms
+    uint64_t msstamp = getCurrentMsTime();
+
+    uint64_t rand      = 0;
+    uint64_t rand_mask = -1LL ^ (-1LL << (32 + 5 + 12));
+
+    // error timestap 
+    if (msstamp < last_msstamp)
+    {
+        LOG_ERROR("ms(%llx) time less last(%llx).", msstamp, last_msstamp);
+
+        //last ms 
+        last_msstamp = msstamp;
+
+        srand(static_cast<uint32_t>(msstamp));
+        rand = random();
+
+        // generate id
+        time_id = ((msstamp - since_date) << 22 | (rand & rand_mask));
+
+        snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx", 
local_id, time_id);
+        return &snowflake_id[0];
+    }
+
+    // increase id
+    if (last_msstamp == msstamp)
+    {
+        sequence = (sequence + 1) & sequence_mask;
+
+        if (0 == sequence) { msstamp = waitNextMills(last_msstamp); }
+    }
+    else
+    {
+        sequence = 0;
+    }
+
+    last_msstamp = msstamp;
+
+    time_id = (((msstamp - since_date) << 22) | sequence);
+
+    LOG_TRACE("ms:0x%llx, ip:0x%.16llx, seq:0x:%x, selfid:%u, "
+              "local_id:0x%.16llx, time_id:0x%.16llx.",
+              (msstamp - since_date) << (22), ipaddr, sequence, 
static_cast<uint32_t>(pthread_self()), local_id, time_id);
+
+    snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx", 
local_id, time_id);
+    return &snowflake_id[0];
+}
+
+int64_t Utils::waitNextMills(int64_t last_ms)
+{
+    int64_t msstamp = getCurrentMsTime();
+
+    while (msstamp <= last_ms)
+    {
+        msstamp = getCurrentMsTime();
+    }
+
+    return msstamp;
+}
+
+bool Utils::getFirstIpAddr(std::string& local_host)
+{
+    int32_t sockfd;
+    int32_t ip_num = 0;
+    char buf[1024] = {0};
+    struct ifreq* ifreq;
+    struct ifreq if_flag;
+    struct ifconf ifconf;
+
+    ifconf.ifc_len = sizeof(buf);
+    ifconf.ifc_buf = buf;
+    if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+    {
+        LOG_ERROR("open the local socket(AF_INET, SOCK_DGRAM) failure!");
+        return false;
+    }
+
+    ioctl(sockfd, SIOCGIFCONF, &ifconf);
+
+    ifreq  = (struct ifreq*)buf;
+    ip_num = ifconf.ifc_len / sizeof(struct ifreq);
+    for (int32_t i = 0; i < ip_num; i++, ifreq++)
+    {
+        //exclude ipv6 addr
+        if (ifreq->ifr_flags != AF_INET) { continue; }
+
+        if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) { continue; 
}
+
+        memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0], 
sizeof(ifreq->ifr_name));
+
+        if ((ioctl(sockfd, SIOCGIFFLAGS, (char*)&if_flag)) < 0) { continue; }
+
+        if ((if_flag.ifr_flags & IFF_LOOPBACK) || !(if_flag.ifr_flags & 
IFF_UP)) { continue; }
+
+        if (!strncmp(inet_ntoa(((struct 
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr), "127.0.0.1", 7)) { continue; }
+
+        local_host = inet_ntoa(((struct 
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+        close(sockfd);
+        return true;
+    }
+    close(sockfd);
+    // local_host = "127.0.0.1";
+    return false;
+}
+
+bool Utils::bindCPU(int32_t cpu_id)
+{
+    int32_t cpunum  = get_nprocs();
+    int32_t cpucore = cpu_id;
+    cpu_set_t mask;
+
+    if (abs(cpu_id) > cpunum)
+    {
+        LOG_ERROR("mask<%d> more than total cpu num<%d>.", cpu_id, cpunum);
+        return false;
+    }
+
+    if (cpu_id < 0) { cpucore = cpunum + cpu_id; }
+
+    CPU_ZERO(&mask);
+    CPU_SET(cpucore, &mask);
+
+    if (sched_setaffinity(0, sizeof(mask), &mask) < 0) { LOG_ERROR("set CPU 
affinity<%d>/<%d> errno<%d>", cpu_id, cpunum, errno); }
+
+    return true;
+}
+
+std::string Utils::base64_encode(const std::string& data)
+{
+    size_t in_len = data.size();
+    size_t out_len = 4 * ((in_len + 2) / 3);
+    std::string ret(out_len, '\0');
+    size_t i;
+    char *p = const_cast<char*>(ret.c_str());
+
+    for (i = 0; i < in_len - 2; i += 3) {
+      *p++ = base64_table[(data[i] >> 2) & 0x3F];
+      *p++ = base64_table[((data[i] & 0x3) << 4) | ((int) (data[i + 1] & 0xF0) 
>> 4)];
+      *p++ = base64_table[((data[i + 1] & 0xF) << 2) | ((int) (data[i + 2] & 
0xC0) >> 6)];
+      *p++ = base64_table[data[i + 2] & 0x3F];
+    }
+    if (i < in_len) {
+      *p++ = base64_table[(data[i] >> 2) & 0x3F];
+      if (i == (in_len - 1)) {
+        *p++ = base64_table[((data[i] & 0x3) << 4)];
+        *p++ = '=';
+      }
+      else {
+        *p++ = base64_table[((data[i] & 0x3) << 4) | ((int) (data[i + 1] & 
0xF0) >> 4)];
+        *p++ = base64_table[((data[i + 1] & 0xF) << 2)];
+      }
+      *p++ = '=';
+    }
+    return ret;
+}
+
+std::string Utils::genBasicAuthCredential(const std::string& id, const 
std::string& key)
+{
+    std::string credential = id + constants::kBasicAuthJoiner + key;
+    return constants::kBasicAuthPrefix + constants::kBasicAuthSeparator + 
base64_encode(credential);
+}
+
+int32_t Utils::requestUrl(std::string& res, const HttpRequest* request)
+{
+    CURL* curl = NULL;
+    struct curl_slist* list = NULL;
+
+    curl_global_init(CURL_GLOBAL_ALL);
+
+    curl = curl_easy_init();
+    if (!curl)
+    {
+        LOG_ERROR("failed to init curl object");
+        return SDKInvalidResult::kErrorCURL;
+    }
+
+    // http header
+    list = curl_slist_append(list, "Content-Type: 
application/x-www-form-urlencoded");
+    if ( request->need_auth && !request->auth_id.empty() && 
!request->auth_key.empty())
+    {
+        // Authorization: Basic xxxxxxxx
+        std::string auth = constants::kBasicAuthHeader + 
constants::kBasicAuthSeparator + genBasicAuthCredential(request->auth_id, 
request->auth_key);
+        LOG_INFO("request manager, auth-header:%s", auth.c_str());
+        list = curl_slist_append(list, auth.c_str());
+    }
+    curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
+    
+    //set url
+    curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
+    curl_easy_setopt(curl, CURLOPT_URL, request->url.c_str());
+    curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request->post_data.c_str());
+    curl_easy_setopt(curl, CURLOPT_TIMEOUT, request->timeout);
+
+    //register callback and get res
+    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Utils::getUrlResponse);
+    curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
+    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
+
+    //execute curl request
+    CURLcode ret = curl_easy_perform(curl);
+    if (ret != 0)
+    {
+        LOG_ERROR("%s", curl_easy_strerror(ret));
+        LOG_ERROR("failed to request data from %s", request->url.c_str());
+        if (curl) curl_easy_cleanup(curl);
+        curl_global_cleanup();
+
+        return SDKInvalidResult::kErrorCURL;
+    }
+
+    int32_t code;
+    curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
+    if (code != 200)
+    {
+        LOG_ERROR("tdm responsed with code %d", code);
+        if (curl) curl_easy_cleanup(curl);
+        curl_global_cleanup();
+
+        return SDKInvalidResult::kErrorCURL;
+    }
+
+    if (res.empty())
+    {
+        LOG_ERROR("tdm return empty data");
+        if (curl) curl_easy_cleanup(curl);
+        curl_global_cleanup();
+
+        return SDKInvalidResult::kErrorCURL;
+    }
+
+    //clean work
+    curl_easy_cleanup(curl);
+    curl_global_cleanup();
+
+    return 0;
+}
+
+size_t Utils::getUrlResponse(void* buffer, size_t size, size_t count, void* 
response)
+{
+    std::string* str = (std::string*)response;
+    (*str).append((char*)buffer, size * count);
+
+    return size * count;
+}
+
+bool Utils::readFile(const std::string& file_path, std::string& content)
+{
+    std::ifstream f(file_path.c_str());
+    if (f.fail())
+    {
+        LOG_ERROR("fail to read file:%s, please check file path", 
file_path.c_str());
+        return false;
+    }
+    std::stringstream ss;
+    ss << f.rdbuf();
+    content = ss.str();
+    return true;
+}
+
+static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
+
+std::string Utils::trim(const std::string& source)
+{
+    std::string target = source;
+    if (!target.empty())
+    {
+        size_t foud_pos = target.find_first_not_of(kWhitespaceCharSet);
+        if (foud_pos != std::string::npos) { target = target.substr(foud_pos); 
}
+        foud_pos = target.find_last_not_of(kWhitespaceCharSet);
+        if (foud_pos != std::string::npos) { target = target.substr(0, 
foud_pos + 1); }
+    }
+    return target;
+}
+
+int32_t Utils::splitOperate(const std::string& source, 
std::vector<std::string>& result, const std::string& delimiter)
+{
+    std::string item_str;
+    std::string::size_type pos1 = 0;
+    std::string::size_type pos2 = 0;
+    result.clear();
+    if (!source.empty())
+    {
+        pos1 = 0;
+        pos2 = source.find(delimiter);
+        while (std::string::npos != pos2)
+        {
+            item_str = trim(source.substr(pos1, pos2 - pos1));
+            pos1     = pos2 + delimiter.size();
+            pos2     = source.find(delimiter, pos1);
+            if (!item_str.empty()) { result.push_back(item_str); }
+        }
+        if (pos1 != source.length())
+        {
+            item_str = trim(source.substr(pos1));
+            if (!item_str.empty()) { result.push_back(item_str); }
+        }
+    }
+    return result.size();
+}
+
+std::string Utils::getVectorStr(std::vector<std::string>& vs)
+{
+    std::string res;
+    for (auto& it : vs)
+    {
+        res += it + ", ";
+    }
+    return res;
+}
+
+}  // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h
new file mode 100644
index 000000000..e33f0ce59
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_UTILS_H_
+#define DATAPROXY_SDK_BASE_UTILS_H_
+
+#include <snappy.h>
+#include <stdint.h>
+#include <string>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <utility>
+#include <vector>
+namespace dataproxy_sdk
+{
+using PAIR = std::pair<std::string, int32_t>;
+struct HttpRequest
+{
+  std::string url;
+  uint32_t timeout;
+  bool need_auth;
+  std::string auth_id;
+  std::string auth_key;
+  std::string post_data;
+
+};
+
+class Utils
+{
+  private:
+    static char snowflake_id[35];
+    static uint16_t sequence;
+    static uint64_t last_msstamp;
+
+  public:
+    static void taskWaitTime(int32_t sec);
+    static uint64_t getCurrentMsTime();                                        
          
+    static uint64_t getCurrentWsTime();                                        
         
+    static std::string getFormatTime(uint64_t date_time);                      
          //format time: yyyymmddHHMMSS
+    static size_t zipData(const char* input, uint32_t input_len, std::string& 
zip_res);  //snappy data
+    static char* getSnowflakeId();                                             
          //get 64bit snowflakeId
+    static bool getFirstIpAddr(std::string& local_host);                       
          
+    inline static bool isLegalTime(uint64_t report_time)                       
          
+    {
+        return ((report_time > 1435101567000LL) && (report_time < 
4103101567000LL));
+    }
+    static bool bindCPU(int32_t cpu_id);
+    static std::string base64_encode(const std::string& data);
+    static std::string genBasicAuthCredential(const std::string& id, const 
std::string& key);                                                   
+    static int32_t requestUrl(std::string& res, const HttpRequest* request);
+    static bool readFile(const std::string& file_path, std::string& content);  
//read file content, save as res, return true is success
+    static int32_t splitOperate(const std::string& source, 
std::vector<std::string>& result, const std::string& delimiter);
+    static std::string getVectorStr(std::vector<std::string>& vs);
+
+    static bool upValueSort(const PAIR& lhs, const PAIR& rhs) { return 
lhs.second < rhs.second; }
+    static bool downValueSort(const PAIR& lhs, const PAIR& rhs) { return 
lhs.second > rhs.second; }
+
+  private:
+    static size_t getUrlResponse(void* buffer, size_t size, size_t count, 
void* response);
+    static int64_t waitNextMills(int64_t last_ms);
+    static std::string trim(const std::string& source);
+};
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_UTILS_H_
\ No newline at end of file

Reply via email to