This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3c33c8aab [INLONG-5249][SDK] DataProxy SDK(cpp) utils file (#5250)
3c33c8aab is described below
commit 3c33c8aaba05e8b83c750bcdf8cb249db950b3bb
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 17:40:14 2022 +0800
[INLONG-5249][SDK] DataProxy SDK(cpp) utils file (#5250)
---
.../dataproxy-sdk-cpp/src/base/atomic.h | 101 +++++
.../dataproxy-sdk-cpp/src/base/ini_help.cc | 292 ++++++++++++++
.../dataproxy-sdk-cpp/src/base/ini_help.h | 94 +++++
.../dataproxy-sdk-cpp/src/base/logger.cc | 97 +++++
.../dataproxy-sdk-cpp/src/base/logger.h | 128 ++++++
.../dataproxy-sdk-cpp/src/base/msg_protocol.h | 90 +++++
.../dataproxy-sdk-cpp/src/base/noncopyable.h | 37 ++
.../dataproxy-sdk-cpp/src/base/read_write_mutex.h | 133 +++++++
.../dataproxy-sdk-cpp/src/base/recv_buffer.h | 419 ++++++++++++++++++++
.../dataproxy-sdk-cpp/src/base/sdk_constant.h | 102 +++++
.../dataproxy-sdk-cpp/src/base/singleton.h | 63 +++
.../dataproxy-sdk-cpp/src/base/utils.cc | 435 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/base/utils.h | 84 ++++
13 files changed, 2075 insertions(+)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h
new file mode 100644
index 000000000..6babab6a0
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/atomic.h
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_ATOMIC_H_
+#define DATAPROXY_SDK_BASE_ATOMIC_H_
+
+#include <stdint.h>
+
+#include "noncopyable.h"
+namespace dataproxy_sdk {
+template <typename T>
+class AtomicIntegerT : noncopyable {
+private:
+ volatile T value_;
+
+public:
+ AtomicIntegerT() : value_(0) {}
+
+ explicit AtomicIntegerT(T value) : value_(value) {}
+
+ // if value_ equals oldval, update it as newval and return true;
+ // otherwise, return false
+ inline bool compareAndSwap(T oldval, T newval)
+ {
+ return __sync_bool_compare_and_swap(&value_, oldval, newval);
+ }
+
+ inline T get()
+ {
+ return __sync_val_compare_and_swap(&value_, 0, 0);
+ }
+
+ inline T getAndAdd(T x)
+ {
+ return __sync_fetch_and_add(&value_, x);
+ }
+
+ inline T getAndIncrease()
+ {
+ return __sync_fetch_and_add(&value_, 1);
+ }
+
+ inline T addAndGet(T x)
+ {
+ return getAndAdd(x) + x;
+ }
+
+ inline T incrementAndGet()
+ {
+ return addAndGet(1);
+ }
+
+ inline T decrementAndGet()
+ {
+ return addAndGet(-1);
+ }
+
+ inline void add(T x)
+ {
+ getAndAdd(x);
+ }
+
+ inline void increment()
+ {
+ incrementAndGet();
+ }
+
+ inline void decrement()
+ {
+ decrementAndGet();
+ }
+
+ inline T getAndSet(T newValue)
+ {
+ return __sync_lock_test_and_set(&value_, newValue);
+ }
+};
+
+using AtomicInt = AtomicIntegerT<int32_t>;
+
+using AtomicUInt = AtomicIntegerT<uint32_t>;
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_ATOMIC_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc
new file mode 100644
index 000000000..9959bafb7
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.cc
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ini_help.h"
+
+#include <ctype.h>
+#include <fstream>
+#include <iostream>
+#include <stdio.h>
+#include <stdlib.h>
+
+namespace dataproxy_sdk
+{
+
+/**
+ * @description: parse line content
+ * @return {*} true if success
+ * @param {string&} content
+ * @param {string*} key
+ * @param {string*} value
+ */
+bool IniFile::parse(const std::string& content, std::string* key, std::string*
value) { return split(content, "=", key, value); }
+
+int IniFile::updateSection(const std::string& cleanLine,
+ const std::string& comment,
+ const std::string& rightComment,
+ IniSection** section)
+{
+ IniSection* newSection;
+ // find ']'
+ size_t index = cleanLine.find_first_of(']');
+ if (index == std::string::npos)
+ {
+ err_msg_ = std::string("no matched ] found");
+ return 1;
+ }
+
+ int len = index - 1;
+
+ if (len <= 0)
+ {
+ err_msg_ = std::string("section name is empty");
+ return 1;
+ }
+
+ // set section name
+ std::string s(cleanLine, 1, len);
+
+ trim(s);
+
+ //check section
+ if (getSection(s) != NULL)
+ {
+ err_msg_ = std::string("section ") + s + std::string("already exist");
+ return 1;
+ }
+
+ newSection = new IniSection();
+ newSection->name = s;
+ newSection->comment = comment;
+ newSection->rightComment = rightComment;
+
+ sections_.push_back(newSection);
+
+ *section = newSection;
+
+ return 0;
+}
+
+int IniFile::addKV(const std::string& cleanLine,
+ const std::string& comment,
+ const std::string& rightComment,
+ IniSection* section)
+{
+ std::string key, value;
+
+ if (!parse(cleanLine, &key, &value))
+ {
+ err_msg_ = std::string("parse line failed:") + cleanLine;
+ return 1;
+ }
+
+ Iterm item;
+ item.key = key;
+ item.value = value;
+ item.comment = comment;
+ item.rightComment = rightComment;
+
+ section->items.push_back(item);
+
+ return 0;
+}
+
+int IniFile::load(const std::string& fileName)
+{
+ int err;
+ std::string line;
+ std::string comment;
+ std::string rightComment;
+ IniSection* currSection = NULL; //init a section
+
+ close();
+
+ ini_file_name_ = fileName;
+ std::ifstream istream(ini_file_name_);
+ if (!istream.is_open())
+ {
+ err_msg_ = std::string("open") + ini_file_name_ + std::string(" file
failed");
+ return 1;
+ }
+
+ // add new section
+ currSection = new IniSection();
+ currSection->name = "";
+ sections_.push_back(currSection);
+
+ // read line
+ while (std::getline(istream, line))
+ {
+ trim(line);
+
+ // skip empty
+ if (line.length() <= 0)
+ {
+ comment += delim;
+ continue;
+ }
+
+ // whether contains section or key
+ // find '['
+ if (line[0] == '[') { err = updateSection(line, comment, rightComment,
&currSection); }
+ else
+ {
+ err = addKV(line, comment, rightComment, currSection);
+ }
+
+ if (err != 0)
+ {
+ istream.close();
+ return err;
+ }
+
+ // clear
+ comment = "";
+ rightComment = "";
+ }
+
+ istream.close();
+
+ return 0;
+}
+
+IniSection* IniFile::getSection(const std::string& section)
+{
+ for (SectionIterator it = sections_.begin(); it != sections_.end(); ++it)
+ {
+ if ((*it)->name == section) { return *it; }
+ }
+
+ return NULL;
+}
+
+int IniFile::getString(const std::string& section, const std::string& key,
std::string* value)
+{
+ return getValue(section, key, value);
+}
+
+int IniFile::getInt(const std::string& section, const std::string& key, int*
intValue)
+{
+ int err;
+ std::string strValue;
+
+ err = getValue(section, key, &strValue);
+
+ *intValue = atoi(strValue.c_str());
+
+ return err;
+}
+
+int IniFile::getValue(const std::string& section, const std::string& key,
std::string* value)
+{
+ std::string comment;
+ return getValue(section, key, value, &comment);
+}
+
+int IniFile::getValue(const std::string& section, const std::string& key,
std::string* value, std::string* comment)
+{
+ IniSection* sect = getSection(section);
+
+ if (sect == NULL)
+ {
+ err_msg_ = std::string("not find the section ") + section;
+ return 1;
+ }
+
+ for (IniSection::ItermIterator it = sect->begin(); it != sect->end(); ++it)
+ {
+ if (it->key == key)
+ {
+ *value = it->value;
+ *comment = it->comment;
+ return 0;
+ }
+ }
+
+ err_msg_ = std::string("not find the key ") + key;
+ return 1;
+}
+
+void IniFile::close()
+{
+ ini_file_name_ = "";
+
+ for (SectionIterator it = sections_.begin(); it != sections_.end(); ++it)
+ {
+ delete (*it); // clear section
+ }
+
+ sections_.clear();
+}
+
+void IniFile::trim(std::string& str)
+{
+ int len = str.length();
+
+ int i = 0;
+
+ while ((i < len) && isspace(str[i]) && (str[i] != '\0'))
+ {
+ i++;
+ }
+
+ if (i != 0) { str = std::string(str, i, len - i); }
+
+ len = str.length();
+
+ for (i = len - 1; i >= 0; --i)
+ {
+ if (!isspace(str[i])) { break; }
+ }
+
+ str = std::string(str, 0, i + 1);
+}
+
+bool IniFile::split(const std::string& str, const std::string& sep,
std::string* pleft, std::string* pright)
+{
+ size_t pos = str.find(sep);
+ std::string left, right;
+
+ if (pos != std::string::npos)
+ {
+ left = std::string(str, 0, pos);
+ right = std::string(str, pos + 1);
+
+ trim(left);
+ trim(right);
+
+ *pleft = left;
+ *pright = right;
+ return true;
+ }
+ else
+ {
+ left = str;
+ right = "";
+
+ trim(left);
+
+ *pleft = left;
+ *pright = right;
+ return false;
+ }
+}
+
+
+} // namespace dataproxy_sdk
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h
new file mode 100644
index 000000000..235752652
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/ini_help.h
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_INI_HELP_H_
+#define DATAPROXY_SDK_BASE_INI_HELP_H_
+
+#include <algorithm>
+#include <string.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+const char delim[] = "\n";
+struct Iterm
+{
+ std::string key;
+ std::string value;
+ std::string comment;
+ std::string rightComment;
+};
+
+struct IniSection
+{
+ using ItermIterator = std::vector<Iterm>::iterator;
+ ItermIterator begin()
+ {
+ return items.begin();
+ }
+
+ ItermIterator end()
+ {
+ return items.end();
+ }
+
+ std::string name;
+ std::string comment;
+ std::string rightComment;
+ std::vector<Iterm> items;
+};
+
+class IniFile
+{
+ public:
+ IniFile(){}
+ ~IniFile() { close(); }
+
+ int load(const std::string& fileName);
+ int getString(const std::string& section, const std::string& key,
std::string* value);
+ int getInt(const std::string& section, const std::string& key, int* value);
+
+ private:
+ IniSection* getSection(const std::string& section = "");
+ static void trim(std::string& str);
+ int updateSection(const std::string& cleanLine,
+ const std::string& comment,
+ const std::string& rightComment,
+ IniSection** section);
+ int addKV(const std::string& cleanLine,
+ const std::string& comment,
+ const std::string& rightComment,
+ IniSection* section);
+ void close();
+ bool split(const std::string& str, const std::string& sep, std::string*
left, std::string* right);
+ bool parse(const std::string& content, std::string* key, std::string*
value);
+ int getValue(const std::string& section, const std::string& key,
std::string* value);
+ int getValue(const std::string& section, const std::string& key,
std::string* value, std::string* comment);
+
+ private:
+ using SectionIterator = std::vector<IniSection*>::iterator;
+ std::vector<IniSection*> sections_;
+ std::string ini_file_name_;
+ std::string err_msg_; // save err msg
+};
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_INI_HELP_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
new file mode 100644
index 000000000..a4fa43e6b
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.cc
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "logger.h"
+
+#include <log4cplus/consoleappender.h>
+#include <log4cplus/fileappender.h>
+#include <log4cplus/layout.h>
+#include <log4cplus/logger.h>
+#include <log4cplus/loggingmacros.h>
+#include <stdarg.h>
+
+#include <string>
+
+#include "singleton.h"
+
+namespace dataproxy_sdk
+{
+Logger& getLogger() { return Singleton<Logger>::instance(); }
+
+bool Logger::init(uint32_t file_max_size,
+ uint32_t file_num,
+ uint8_t level,
+ uint8_t output_type,
+ bool enable_limit,
+ const std::string& base_path,
+ const std::string& logname)
+{
+ file_max_size_ = file_max_size;
+ file_num_ = file_num;
+ level_ = level;
+ output_type_ = output_type;
+ enable_limit_ = enable_limit;
+ base_path_ = base_path;
+ log_name_ = logname;
+ setUp();
+ return true;
+}
+
+bool Logger::write(const char* format, ...)
+{
+ char buf[8192];
+ va_list ap;
+ va_start(ap, format);
+ vsnprintf(buf, sizeof(buf) - 1, format, ap);
+ va_end(ap);
+ return writeCharStream(buf);
+}
+
+bool Logger::writeCharStream(const char* log)
+{
+ auto logger = log4cplus::Logger::getInstance(instance_);
+ logger.forcedLog(log4cplus::TRACE_LOG_LEVEL, log);
+ return true;
+}
+
+void Logger::setUp()
+{
+ bool immediate_flush = true;
+ std::string pattern = "[%D{%Y-%m-%d %H:%M:%S.%q}]%m%n";
+ auto logger_d = log4cplus::Logger::getInstance(instance_);
+ logger_d.removeAllAppenders();
+ logger_d.setLogLevel(log4cplus::TRACE_LOG_LEVEL);
+
+ if (output_type_ == 2)
+ { //file
+ log4cplus::SharedAppenderPtr fileAppender(new
log4cplus::RollingFileAppender(
+ base_path_ + log_name_, file_max_size_ * kMBSize, file_num_,
immediate_flush, true));
+ std::unique_ptr<log4cplus::Layout> layout(new
log4cplus::PatternLayout(pattern));
+ fileAppender->setLayout(std::move(layout));
+ logger_d.addAppender(fileAppender);
+ }
+ else
+ { //console
+ log4cplus::SharedAppenderPtr consoleAppender(new
log4cplus::ConsoleAppender());
+ consoleAppender->setLayout(std::unique_ptr<log4cplus::Layout>(new
log4cplus::SimpleLayout()));
+ logger_d.addAppender(consoleAppender);
+ }
+}
+
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
new file mode 100644
index 000000000..e7582f25f
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/logger.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_LOGGER_H_
+#define DATAPROXY_SDK_BASE_LOGGER_H_
+
+#include <stdint.h>
+#include <string.h>
+#include <string>
+#include <unistd.h>
+#include <vector>
+
+#include "sdk_constant.h"
+
+namespace dataproxy_sdk
+{
+static const uint32_t kMBSize = 1024 * 1024;
+const uint32_t kPid = getpid();
+
+class Logger;
+
+Logger& getLogger();
+
+// only show fileName
+#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 :
__FILE__)
+
+#define LOG_LEVEL(level, fmt, ...)
\
+ {
\
+ if (dataproxy_sdk::getLogger().enableLevel(level))
\
+ {
\
+ dataproxy_sdk::getLogger().write("[pid:%d][%s][%s:%s:%d]" fmt,
kPid, dataproxy_sdk::Logger::level2String(level), \
+ __FILENAME__, __func__, __LINE__,
##__VA_ARGS__); \
+ }
\
+ }
+
+#define LOG_TRACE(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(),
dataproxy_sdk::Logger::kLogTrace, fmt, ##__VA_ARGS__)
+#define LOG_DEBUG(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(),
dataproxy_sdk::Logger::kLogDebug, fmt, ##__VA_ARGS__)
+#define LOG_INFO(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(),
dataproxy_sdk::Logger::kLogInfo, fmt, ##__VA_ARGS__)
+#define LOG_WARN(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(),
dataproxy_sdk::Logger::kLogWarn, fmt, ##__VA_ARGS__)
+#define LOG_ERROR(fmt, ...) LOG_TDBUSCAPI(dataproxy_sdk::getLogger(),
dataproxy_sdk::Logger::kLogError, fmt, ##__VA_ARGS__)
+
+#define LOG_TDBUSCAPI(logger, level, fmt, ...)
\
+ {
\
+ if (logger.enableLevel(level))
\
+ {
\
+ logger.write("[pid:%d][%s][%s:%s:%d]" fmt, kPid,
dataproxy_sdk::Logger::level2String(level), __FILENAME__, __func__, \
+ __LINE__, ##__VA_ARGS__);
\
+ }
\
+ }
+
+class Logger
+{
+ public:
+ enum Level {
+ kLogError = 0,
+ kLogWarn = 1,
+ kLogInfo = 2,
+ kLogDebug = 3,
+ kLogTrace = 4,
+ };
+
+ private:
+ uint32_t file_max_size_;
+ uint32_t file_num_;
+ uint8_t level_;
+ uint8_t output_type_; //2->file, 1->console
+ bool enable_limit_;
+ std::string base_path_;
+ std::string log_name_;
+
+ std::string instance_;
+
+ public:
+ Logger()
+ : file_max_size_(10)
+ , file_num_(10)
+ , level_(kLogInfo)
+ , output_type_(2)
+ , enable_limit_(true)
+ , base_path_("./logs/")
+ , instance_("DataProxySDK")
+ {
+ }
+
+ ~Logger() {}
+
+ bool init(uint32_t file_max_size,
+ uint32_t file_num,
+ uint8_t level,
+ uint8_t output_type,
+ bool enable_limit,
+ const std::string& base_path,
+ const std::string& logname = constants::kLogName);
+ bool write(const char* sFormat, ...) __attribute__((format(printf, 2, 3)));
+ inline bool writeStream(const std::string& msg) { return
writeCharStream(msg.c_str()); }
+ inline bool enableLevel(Level level) { return ((level <= level_) ? true :
false); }
+ static const char* level2String(Level level)
+ {
+ static const char* level_names[] = {
+ "ERROR", "WARN", "INFO", "DEBUG", "TRACE",
+ };
+ return level_names[level];
+ }
+
+ private:
+ void setUp();
+ bool writeCharStream(const char* msg);
+};
+
+} // namespace dataproxy_sdk
+
+#endif // CPAI_BASE_LOGGER_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h
new file mode 100644
index 000000000..844c9a6c8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/msg_protocol.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
+#define DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
+
+#include "send_buffer.h"
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+namespace dataproxy_sdk
+{
+// msg_type 1~6
+// totalLen(4)|msgtype(1)|bodyLen(4)|body(x)|attrLen(4)|attr(attr_len)
+#pragma pack(1)
+struct ProtocolMsgHead
+{
+ uint32_t total_len;
+ char msg_type;
+};
+struct ProtocolMsgBody
+{
+ uint32_t body_len;
+ char body[0];
+};
+struct ProtocolMsgTail
+{
+ uint32_t attr_len;
+ char attr[0];
+};
+// msg_type=7
+//
totalLen(4)|msgtype(1)|bid_num(2)|tid_num(2)|ext_field(2)|data_time(4)|cnt(2)|uniq(4)|bodyLen(4)|body(x)|attrLen(4)|attr(attr_len)|magic(2)
+struct BinaryMsgHead
+{
+ uint32_t total_len;
+ char msg_type;
+ uint16_t bid_num;
+ uint16_t tid_num;
+ uint16_t ext_field;
+ uint32_t data_time; //second, last pack time
+ uint16_t cnt;
+ uint32_t uniq;
+};
+// msg_type7 ack pack
+struct BinaryMsgAck
+{
+ uint32_t total_len;
+ char msg_type;
+ uint32_t uniq; //buffer id
+ uint16_t attr_len;
+ char attr[0];
+ uint16_t magic;
+};
+
+// binary hb and hb ack, msg_type=8; hb ack: body_ver is 1, if body_len is 2,
there is load in body
+struct BinaryHB
+{
+ uint32_t total_len;
+ char msg_type;
+ uint32_t data_time; // second
+ uint8_t body_ver; // body_ver=1
+ uint32_t body_len; // body_len=0
+ char body[0];
+ uint16_t attr_len; // attr_len=0;
+ char attr[0];
+ uint16_t magic;
+};
+#pragma pack()
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_MSG_PROTOCOL_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h
new file mode 100644
index 000000000..e3569f1ee
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/noncopyable.h
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_NONCOPYABLE_H_
+#define DATAPROXY_SDK_BASE_NONCOPYABLE_H_
+
+namespace dataproxy_sdk
+{
+class noncopyable
+{
+ public:
+ noncopyable(const noncopyable&) = delete;
+ void operator=(const noncopyable&) = delete;
+
+ protected:
+ noncopyable() = default;
+ ~noncopyable() = default;
+};
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_NONCOPYABLE_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h
new file mode 100644
index 000000000..d23128229
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/read_write_mutex.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
+#define DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
+
+#include <condition_variable>
+#include <mutex>
+
+namespace dataproxy_sdk
+{
+// wirte operation add lock:unique_read_lock<read_write_mutex> lock( rwmutex );
+// read operation add lock:unique_write_lock<read_write_mutex> lock(rwmutex);
+
+class read_write_mutex
+{
+ public:
+ read_write_mutex() = default;
+ ~read_write_mutex() = default;
+
+ read_write_mutex(const read_write_mutex&) = delete;
+ read_write_mutex& operator=(const read_write_mutex&) = delete;
+
+ read_write_mutex(const read_write_mutex&&) = delete;
+ read_write_mutex& operator=(const read_write_mutex&&) = delete;
+
+ void lock_read()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ m_cond_read.wait(lock, [this]() -> bool { return m_write_count == 0;
});
+ ++m_read_count;
+ }
+
+ void unlock_read()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (--m_read_count == 0 && m_write_count > 0) {
m_cond_write.notify_one(); }
+ }
+
+ void lock_write()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ ++m_write_count;
+ m_cond_write.wait(lock, [this]() -> bool { return m_read_count == 0 &&
!m_writing; });
+ m_writing = true;
+ }
+
+ void unlock_write()
+ {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (--m_write_count == 0) { m_cond_read.notify_all(); }
+ else
+ {
+ m_cond_write.notify_one();
+ }
+ m_writing = false;
+ }
+
+ private:
+ volatile size_t m_read_count = 0;
+ volatile size_t m_write_count = 0;
+ volatile bool m_writing = false;
+ mutable std::mutex m_mutex; // KEYPOINT: add mutable
+ std::condition_variable m_cond_read;
+ std::condition_variable m_cond_write;
+};
+
+template <typename _ReadWriteLock>
+class unique_read_lock
+{
+ public:
+ explicit unique_read_lock(_ReadWriteLock& rwLock) : m_ptr_rw_lock(&rwLock)
{ m_ptr_rw_lock->lock_read(); }
+
+ ~unique_read_lock()
+ {
+ if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_read(); }
+ }
+
+ unique_read_lock() = delete;
+ unique_read_lock(const unique_read_lock&) = delete;
+ unique_read_lock& operator=(const unique_read_lock&) = delete;
+ unique_read_lock(const unique_read_lock&&) = delete;
+ unique_read_lock& operator=(const unique_read_lock&&) = delete;
+
+ private:
+ _ReadWriteLock* m_ptr_rw_lock = nullptr;
+};
+
+template <typename _ReadWriteLock>
+class unique_write_lock
+{
+ public:
+ explicit unique_write_lock(_ReadWriteLock& rwLock) :
m_ptr_rw_lock(&rwLock) { m_ptr_rw_lock->lock_write(); }
+
+ ~unique_write_lock()
+ {
+ if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_write(); }
+ }
+
+ unique_write_lock() = delete;
+ unique_write_lock(const unique_write_lock&) = delete;
+ unique_write_lock& operator=(const unique_write_lock&) = delete;
+ unique_write_lock(const unique_write_lock&&) = delete;
+ unique_write_lock& operator=(const unique_write_lock&&) = delete;
+
+ void unlock()
+ {
+ if (m_ptr_rw_lock) { m_ptr_rw_lock->unlock_write(); }
+ }
+
+ private:
+ _ReadWriteLock* m_ptr_rw_lock = nullptr;
+};
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_READ_WRITE_MUTEX_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h
new file mode 100644
index 000000000..8e9977679
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/recv_buffer.h
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_RECV_BUFFER_H_
+#define DATAPROXY_SDK_BASE_RECV_BUFFER_H_
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <algorithm>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "logger.h"
+
+namespace dataproxy_sdk
+{
+class RecvBuffer;
+using RecvBufferPtr = std::shared_ptr<RecvBuffer>;
+
+//store ack data from dataproxy
+class RecvBuffer
+{
+ public:
+ char* buffer_;
+ size_t capacity_;
+ size_t read_index_;
+ size_t write_index_;
+ size_t reserved_prepend_size_;
+ bool has_mem_{true};
+
+ public:
+ enum {
+ kCheapPrependSize = 0,
+ kInitialSize = 1024 * 1024,
+ };
+
+ explicit RecvBuffer(size_t initial_size = kInitialSize, size_t
reserved_prepend_size = kCheapPrependSize)
+ : capacity_(reserved_prepend_size + initial_size)
+ , read_index_(reserved_prepend_size)
+ , write_index_(reserved_prepend_size)
+ , reserved_prepend_size_(reserved_prepend_size)
+ {
+ buffer_ = new char[capacity_];
+ has_mem_ = true;
+ assert(length() == 0);
+ assert(WritableBytes() == initial_size);
+ assert(PrependableBytes() == reserved_prepend_size);
+ }
+
+ ~RecvBuffer()
+ {
+ if (has_mem_ && buffer_)
+ {
+ delete[] buffer_;
+ }
+ buffer_ = nullptr;
+ capacity_ = 0;
+ }
+
+ std::string String()
+ {
+ char buf[1024];
+ snprintf(buf, sizeof(buf),
"buffer:%p,capacity:%ld,readindex:%ld,writeindex:%ld,prependsize:%ld,hasmem:%d",
buffer_,
+ capacity_, read_index_, write_index_, reserved_prepend_size_,
has_mem_);
+ return buf;
+ }
+
+ RecvBufferPtr Slice()
+ {
+ auto buff = std::make_shared<RecvBuffer>(*this);
+ buff->has_mem_ = false;
+ return buff;
+ }
+
+ void Swap(RecvBuffer& rhs)
+ {
+ std::swap(buffer_, rhs.buffer_);
+ std::swap(capacity_, rhs.capacity_);
+ std::swap(read_index_, rhs.read_index_);
+ std::swap(write_index_, rhs.write_index_);
+ std::swap(reserved_prepend_size_, rhs.reserved_prepend_size_);
+ }
+
+ // Skip advances the reading index of the buffer
+ void Skip(size_t len)
+ {
+ if (len < length()) { read_index_ += len; }
+ else
+ {
+ Reset();
+ }
+ }
+
+ // Retrieve advances the reading index of the buffer
+ // Retrieve it the same as Skip.
+ void Retrieve(size_t len) { Skip(len); }
+
+ // Truncate discards all but the first n unread bytes from the buffer
+ // but continues to use the same allocated storage.
+ // It does nothing if n is greater than the length of the buffer.
+ void Truncate(size_t n)
+ {
+ if (n == 0)
+ {
+ read_index_ = reserved_prepend_size_;
+ write_index_ = reserved_prepend_size_;
+ }
+ else if (write_index_ > read_index_ + n)
+ {
+ write_index_ = read_index_ + n;
+ }
+ }
+
+ // Reset resets the buffer to be empty,
+ // but it retains the underlying storage for use by future writes.
+ // Reset is the same as Truncate(0).
+ void Reset() { Truncate(0); }
+
+ // Increase the capacity of the container to a value that's greater
+ // or equal to len. If len is greater than the current capacity(),
+ // new storage is allocated, otherwise the method does nothing.
+ void Reserve(size_t len)
+ {
+ if (capacity_ >= len + reserved_prepend_size_) { return; }
+
+ grow(len + reserved_prepend_size_);
+ }
+
+ // Make sure there is enough memory space to append more data with length
len
+ void EnsureWritableBytes(size_t len)
+ {
+ if (WritableBytes() < len) { grow(len); }
+
+ assert(WritableBytes() >= len);
+ }
+
+ // ToText appends char '\0' to buffer to convert the underlying data to a
c-style string text.
+ // It will not change the length of buffer.
+ void ToText()
+ {
+ AppendInt8('\0');
+ UnwriteBytes(1);
+ }
+
+ // Write
+ public:
+ void Write(const void* /*restrict*/ d, size_t len)
+ {
+ EnsureWritableBytes(len);
+ memcpy(WriteBegin(), d, len);
+ assert(write_index_ + len <= capacity_);
+ write_index_ += len;
+ }
+
+ void Append(const char* /*restrict*/ d, size_t len) { Write(d, len); }
+
+ void Append(const void* /*restrict*/ d, size_t len) { Write(d, len); }
+
+ void AppendInt32(int32_t x)
+ {
+ int32_t be32 = htonl(x);
+ Write(&be32, sizeof be32);
+ }
+
+ void AppendInt16(int16_t x)
+ {
+ int16_t be16 = htons(x);
+ Write(&be16, sizeof be16);
+ }
+
+ void AppendInt8(int8_t x) { Write(&x, sizeof x); }
+
+ void PrependInt32(int32_t x)
+ {
+ int32_t be32 = htonl(x);
+ Prepend(&be32, sizeof be32);
+ }
+
+ void PrependInt16(int16_t x)
+ {
+ int16_t be16 = htons(x);
+ Prepend(&be16, sizeof be16);
+ }
+
+ void PrependInt8(int8_t x) { Prepend(&x, sizeof x); }
+
+ // Insert content, specified by the parameter, into the front of reading
index
+ void Prepend(const void* /*restrict*/ d, size_t len)
+ {
+ assert(len <= PrependableBytes());
+ read_index_ -= len;
+ const char* p = static_cast<const char*>(d);
+ memcpy(begin() + read_index_, p, len);
+ }
+
+ void UnwriteBytes(size_t n)
+ {
+ assert(n <= length());
+ write_index_ -= n;
+ }
+
+ void WriteBytes(size_t n)
+ {
+ assert(n <= WritableBytes());
+ write_index_ += n;
+ }
+
+ // Read
+ public:
+ // Peek int32_t/int16_t/int8_t with network endian
+
+ uint32_t ReadUint32()
+ {
+ uint32_t result = PeekUint32();
+ Skip(sizeof result);
+ return result;
+ }
+
+ int32_t ReadInt32()
+ {
+ int32_t result = PeekInt32();
+ Skip(sizeof result);
+ return result;
+ }
+
+ int16_t ReadInt16()
+ {
+ int16_t result = PeekInt16();
+ Skip(sizeof result);
+ return result;
+ }
+
+ uint16_t ReadUint16()
+ {
+ uint16_t result = PeekUint16();
+ Skip(sizeof result);
+ return result;
+ }
+
+ int8_t ReadInt8()
+ {
+ int8_t result = PeekInt8();
+ Skip(sizeof result);
+ return result;
+ }
+
+ uint8_t ReadUint8()
+ {
+ uint8_t result = PeekUint8();
+ Skip(sizeof result);
+ return result;
+ }
+
+ std::string ToString() const { return std::string(data(), length()); }
+
+ // ReadByte reads and returns the next byte from the buffer.
+ // If no byte is available, it returns '\0'.
+ char ReadByte()
+ {
+ assert(length() >= 1);
+
+ if (length() == 0) { return '\0'; }
+
+ return buffer_[read_index_++];
+ }
+
+ // UnreadBytes unreads the last n bytes returned
+ // by the most recent read operation.
+ void UnreadBytes(size_t n)
+ {
+ assert(n < read_index_);
+ read_index_ -= n;
+ }
+
+ // Peek
+ public:
+ // Peek int64_t/int32_t/int16_t/int8_t with network endian
+
+ uint32_t PeekUint32() const
+ {
+ // LOG_DEBUG("recv_buf->length() is%d", length());
+ assert(length() >= sizeof(uint32_t));
+ uint32_t be32 = 0;
+ ::memcpy(&be32, data(), sizeof be32);
+ return ntohl(be32);
+ // LOG_DEBUG
+ }
+
+ int32_t PeekInt32() const
+ {
+ assert(length() >= sizeof(int32_t));
+ int32_t be32 = 0;
+ ::memcpy(&be32, data(), sizeof be32);
+ return ntohl(be32);
+ }
+
+ int16_t PeekInt16() const
+ {
+ assert(length() >= sizeof(int16_t));
+ int16_t be16 = 0;
+ ::memcpy(&be16, data(), sizeof be16);
+ return ntohs(be16);
+ }
+
+ uint16_t PeekUint16() const
+ {
+ assert(length() >= sizeof(uint16_t));
+ uint16_t be16 = 0;
+ ::memcpy(&be16, data(), sizeof be16);
+ return ntohs(be16);
+ }
+
+ int8_t PeekInt8() const
+ {
+ assert(length() >= sizeof(int8_t));
+ int8_t x = *data();
+ return x;
+ }
+
+ uint8_t PeekUint8() const
+ {
+ assert(length() >= sizeof(uint8_t));
+ uint8_t x = *data();
+ return x;
+ }
+
+ public:
+ // data returns a pointer of length Buffer.length() holding the unread
portion of the buffer.
+ // The data is valid for use only until the next buffer modification (that
is,
+ // only until the next call to a method like Read, Write, Reset, or
Truncate).
+ // The data aliases the buffer content at least until the next buffer
modification,
+ // so immediate changes to the slice will affect the result of future
reads.
+ const char* data() const { return buffer_ + read_index_; }
+
+ char* WriteBegin() { return begin() + write_index_; }
+
+ const char* WriteBegin() const { return begin() + write_index_; }
+
+ // length returns the number of bytes of the unread portion of the buffer
+ size_t length() const
+ {
+ assert(write_index_ >= read_index_);
+ return write_index_ - read_index_;
+ }
+
+ // size returns the number of bytes of the unread portion of the buffer.
+ // It is the same as length().
+ size_t size() const { return length(); }
+
+ // capacity returns the capacity of the buffer's underlying byte slice,
that is, the
+ // total space allocated for the buffer's data.
+ size_t capacity() const { return capacity_; }
+
+ size_t WritableBytes() const
+ {
+ assert(capacity_ >= write_index_);
+ return capacity_ - write_index_;
+ }
+
+ size_t PrependableBytes() const { return read_index_; }
+
+ public:
+ char* begin() { return buffer_; }
+
+ const char* begin() const { return buffer_; }
+
+ void grow(size_t len)
+ {
+ if (!has_mem_) { return; }
+ if (WritableBytes() + PrependableBytes() < len +
reserved_prepend_size_)
+ {
+ // grow the capacity
+ size_t n = (capacity_ << 1) + len;
+ size_t m = length();
+ char* d = new char[n];
+ memcpy(d + reserved_prepend_size_, begin() + read_index_, m);
+ write_index_ = m + reserved_prepend_size_;
+ read_index_ = reserved_prepend_size_;
+ capacity_ = n;
+ delete[] buffer_;
+ buffer_ = d;
+ }
+ else
+ {
+ // move readable data to the front, make space inside buffer
+ assert(reserved_prepend_size_ < read_index_);
+ size_t readable = length();
+ memmove(begin() + reserved_prepend_size_, begin() + read_index_,
length());
+ read_index_ = reserved_prepend_size_;
+ write_index_ = read_index_ + readable;
+ assert(readable == length());
+ assert(WritableBytes() >= len);
+ }
+ }
+};
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_RECV_BUFFER_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
new file mode 100644
index 000000000..ef2c9514a
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_constant.h
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CONSTANT_H_
+#define DATAPROXY_SDK_BASE_CONSTANT_H_
+
+#include <stdint.h>
+#include <string>
+
+namespace dataproxy_sdk
+{
+ namespace constants
+ {
+ // load weight
+ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3,
3, 3, 3, 3, 6, 6, 6, 6, 6, 12, 12, 12, 12, 12, 48, 96, 192, 384, 1000};
+
+ static const int32_t kPrime[6] = {1, 3, 5, 7, 11, 13};
+ static const int32_t kPrimeSize = 6;
+
+ static const int32_t kMaxRequestTDMTimes = 4;
+ static const int32_t kMaxRetryConnection = 20;
//create conn failed more than 20 times,
start sendbuf callback
+ static const std::string kAttrFormat =
"__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx"; // msg_type 7
body's attr format
+ static const int32_t kAttrLen = kAttrFormat.size();
+
+ static const char kTDBusCAPIVersion[] = "dataproxy_sdk_cpp-v2.0.0";
+ static const char kLogName[] = "dataproxy_cpp.log";
+ static const uint16_t kBinaryMagic = 0xEE01;
+ static const uint32_t kBinPackMethod = 7; // msg>=7
+ static const uint8_t kBinSnappyFlag = 1 << 5; // snappy flag
+ static const int32_t kBackupBusNum = 4; // backup_proxy_num
+
+ static const int32_t kThreadNums = 10;
+ static const int32_t kSharedBufferNums=5;
+ static const bool kEnableBidIsolation = false;
+ static const int32_t kBufferNumPerBid = 5;
+ static const bool kEnablePack = true;
+ static const uint32_t kPackSize = 4096;
+ static const uint32_t kPackTimeout = 3000;
+ static const uint32_t kExtPackSize = 16384;
+ static const bool kEnableZip = true;
+ static const uint32_t kMinZipLen = 512;
+
+ static const bool kEnableRetry = true;
+ static const uint32_t kRetryInterval = 3000;
+ static const uint32_t kRetryNum = 3;
+ static const uint32_t kLogNum = 10;
+ static const uint32_t kLogSize = 10;
+ static const uint8_t kLogLevel = 1;
+ static const uint8_t kLogFileType = 2;
+ static const std::string kLogPath = "./logs/";
+ static const bool kLogEnableLimit = true;
+
+ static const std::string kProxyURL =
"http://127.0.0.1:8099/api/dataproxy_ip_v2";
+ static const bool kEnableProxyURLFromCluster = false;
+ static const std::string kBusClusterURL =
+
"http://127.0.0.1:8099/heartbeat/dataproxy_ip_v2?cluster_id=0&net_tag=normal";
+ static const uint32_t kProxyUpdateInterval = 10;
+ static const uint32_t kProxyURLTimeout = 2;
+ static const uint32_t kMaxActiveProxyNum = 3;
+
+ static const std::string kSerIP = "127.0.0.1";
+ static const uint32_t kMaxBufPool = 50 * 1024 * 1024;
+ static const uint32_t kMsgType = 7;
+
+ static const bool kEnableTCPNagle = true;
+ static const bool kEnableHeartBeat = true;
+ static const uint32_t kHeartBeatInterval = 60;
+ static const bool kEnableSetAffinity = false;
+ static const uint32_t kMaskCPUAffinity = 0xff;
+ static const bool kIsFromDC = false;
+ static const uint16_t kExtendField = 0;
+ static const std::string kNetTag = "all";
+
+ static const bool kNeedAuth = false;
+
+ // http basic auth
+ static const std::string kBasicAuthHeader = "Authorization:";
+ static const std::string kBasicAuthPrefix = "Basic";
+ static const std::string kBasicAuthSeparator = " ";
+ static const std::string kBasicAuthJoiner = ":";
+
+ } // namespace constants
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_CONSTANT_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h
new file mode 100644
index 000000000..6e927de87
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/singleton.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_SINGLETON_H_
+#define DATAPROXY_SDK_BASE_SINGLETON_H_
+
+#include <assert.h>
+#include <mutex>
+#include <thread>
+
+#include "noncopyable.h"
+
+namespace dataproxy_sdk {
+template <typename T>
+class Singleton : noncopyable {
+private:
+ static std::once_flag once_;
+ static T* value_;
+
+public:
+ Singleton() = delete;
+ ~Singleton() = delete;
+
+ static T& instance()
+ {
+ std::call_once(once_, Singleton::init);
+ assert(value_ != nullptr);
+
+ return *value_;
+ }
+
+private:
+ static void init()
+ {
+ value_ = new T();
+ }
+};
+
+template <typename T>
+std::once_flag Singleton<T>::once_;
+
+template <typename T>
+T* Singleton<T>::value_ = nullptr;
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_SINGLETON_H_
\ No newline at end of file
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
new file mode 100644
index 000000000..0a832b914
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.cc
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "utils.h"
+
+#include <arpa/inet.h>
+#include <ctime>
+#include <curl/curl.h>
+#include <errno.h>
+#include <fstream>
+#include <iostream>
+#include <iterator>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <regex>
+#include <sstream>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/sysinfo.h>
+#include <sys/time.h>
+
+#include "logger.h"
+#include "tc_api.h"
+namespace dataproxy_sdk
+{
+uint16_t Utils::sequence = 0;
+uint64_t Utils::last_msstamp = 0;
+char Utils::snowflake_id[35] = {0};
+char base64_table[] = {
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
+ 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
+ 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
+ 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
+ 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
+ 'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
+ 'w', 'x', 'y', 'z', '0', '1', '2', '3',
+ '4', '5', '6', '7', '8', '9', '+', '/'
+ };
+
+void Utils::taskWaitTime(int32_t sec)
+{
+ struct timeval tv;
+ tv.tv_sec = sec;
+ tv.tv_usec = 0;
+ int err;
+ do
+ {
+ err = select(0, NULL, NULL, NULL, &tv);
+ } while (err < 0 && errno == EINTR);
+}
+
+uint64_t Utils::getCurrentMsTime()
+{
+ uint64_t ms_time = 0;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ms_time = ((uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000);
+ return ms_time;
+}
+uint64_t Utils::getCurrentWsTime()
+{
+ uint64_t ws_time = 0;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ws_time = ((uint64_t)tv.tv_sec * 1000000 + tv.tv_usec);
+ return ws_time;
+}
+
+std::string Utils::getFormatTime(uint64_t data_time)
+{
+ struct tm timeinfo;
+ char buffer[80];
+
+ // time(&rawtime);
+ time_t m_time = data_time / 1000;
+ localtime_r(&m_time, &timeinfo);
+
+ strftime(buffer, sizeof(buffer), "%Y%m%d%H%M%S", &timeinfo);
+
+ return std::string(buffer);
+}
+
+size_t Utils::zipData(const char* input, uint32_t input_len, std::string&
zip_res)
+{
+ size_t len_after_zip = snappy::Compress((char*)input, input_len, &zip_res);
+ LOG_TRACE("data zip: input len is %u, output len is %u.", input_len,
len_after_zip);
+ return len_after_zip;
+}
+
+char* Utils::getSnowflakeId()
+{
+ std::string local_host;
+ getFirstIpAddr(local_host);
+ uint64_t ipaddr = htonl(inet_addr(local_host.c_str()));
+ uint32_t pidid = static_cast<uint16_t>((getpid() & 0xFFFF));
+ uint32_t selfid = static_cast<uint16_t>((pthread_self() & 0xFFFF00) >> 8);
+
+ // 22bit ms
+ uint64_t sequence_mask = -1LL ^ (-1LL << 22);
+
+ uint64_t time_id = 0LL;
+ uint64_t local_id = (ipaddr << 32) | (pidid << 16) | (selfid);
+
+ uint64_t since_date = 1288834974657LL; // Thu, 04 Nov 2010 01:42:54 GMT
+
+ // 41bit ms
+ uint64_t msstamp = getCurrentMsTime();
+
+ uint64_t rand = 0;
+ uint64_t rand_mask = -1LL ^ (-1LL << (32 + 5 + 12));
+
+ // error timestap
+ if (msstamp < last_msstamp)
+ {
+ LOG_ERROR("ms(%llx) time less last(%llx).", msstamp, last_msstamp);
+
+ //last ms
+ last_msstamp = msstamp;
+
+ srand(static_cast<uint32_t>(msstamp));
+ rand = random();
+
+ // generate id
+ time_id = ((msstamp - since_date) << 22 | (rand & rand_mask));
+
+ snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx",
local_id, time_id);
+ return &snowflake_id[0];
+ }
+
+ // increase id
+ if (last_msstamp == msstamp)
+ {
+ sequence = (sequence + 1) & sequence_mask;
+
+ if (0 == sequence) { msstamp = waitNextMills(last_msstamp); }
+ }
+ else
+ {
+ sequence = 0;
+ }
+
+ last_msstamp = msstamp;
+
+ time_id = (((msstamp - since_date) << 22) | sequence);
+
+ LOG_TRACE("ms:0x%llx, ip:0x%.16llx, seq:0x:%x, selfid:%u, "
+ "local_id:0x%.16llx, time_id:0x%.16llx.",
+ (msstamp - since_date) << (22), ipaddr, sequence,
static_cast<uint32_t>(pthread_self()), local_id, time_id);
+
+ snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx",
local_id, time_id);
+ return &snowflake_id[0];
+}
+
+int64_t Utils::waitNextMills(int64_t last_ms)
+{
+ int64_t msstamp = getCurrentMsTime();
+
+ while (msstamp <= last_ms)
+ {
+ msstamp = getCurrentMsTime();
+ }
+
+ return msstamp;
+}
+
+bool Utils::getFirstIpAddr(std::string& local_host)
+{
+ int32_t sockfd;
+ int32_t ip_num = 0;
+ char buf[1024] = {0};
+ struct ifreq* ifreq;
+ struct ifreq if_flag;
+ struct ifconf ifconf;
+
+ ifconf.ifc_len = sizeof(buf);
+ ifconf.ifc_buf = buf;
+ if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+ {
+ LOG_ERROR("open the local socket(AF_INET, SOCK_DGRAM) failure!");
+ return false;
+ }
+
+ ioctl(sockfd, SIOCGIFCONF, &ifconf);
+
+ ifreq = (struct ifreq*)buf;
+ ip_num = ifconf.ifc_len / sizeof(struct ifreq);
+ for (int32_t i = 0; i < ip_num; i++, ifreq++)
+ {
+ //exclude ipv6 addr
+ if (ifreq->ifr_flags != AF_INET) { continue; }
+
+ if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) { continue;
}
+
+ memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0],
sizeof(ifreq->ifr_name));
+
+ if ((ioctl(sockfd, SIOCGIFFLAGS, (char*)&if_flag)) < 0) { continue; }
+
+ if ((if_flag.ifr_flags & IFF_LOOPBACK) || !(if_flag.ifr_flags &
IFF_UP)) { continue; }
+
+ if (!strncmp(inet_ntoa(((struct
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr), "127.0.0.1", 7)) { continue; }
+
+ local_host = inet_ntoa(((struct
sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
+ close(sockfd);
+ return true;
+ }
+ close(sockfd);
+ // local_host = "127.0.0.1";
+ return false;
+}
+
+bool Utils::bindCPU(int32_t cpu_id)
+{
+ int32_t cpunum = get_nprocs();
+ int32_t cpucore = cpu_id;
+ cpu_set_t mask;
+
+ if (abs(cpu_id) > cpunum)
+ {
+ LOG_ERROR("mask<%d> more than total cpu num<%d>.", cpu_id, cpunum);
+ return false;
+ }
+
+ if (cpu_id < 0) { cpucore = cpunum + cpu_id; }
+
+ CPU_ZERO(&mask);
+ CPU_SET(cpucore, &mask);
+
+ if (sched_setaffinity(0, sizeof(mask), &mask) < 0) { LOG_ERROR("set CPU
affinity<%d>/<%d> errno<%d>", cpu_id, cpunum, errno); }
+
+ return true;
+}
+
+std::string Utils::base64_encode(const std::string& data)
+{
+ size_t in_len = data.size();
+ size_t out_len = 4 * ((in_len + 2) / 3);
+ std::string ret(out_len, '\0');
+ size_t i;
+ char *p = const_cast<char*>(ret.c_str());
+
+ for (i = 0; i < in_len - 2; i += 3) {
+ *p++ = base64_table[(data[i] >> 2) & 0x3F];
+ *p++ = base64_table[((data[i] & 0x3) << 4) | ((int) (data[i + 1] & 0xF0)
>> 4)];
+ *p++ = base64_table[((data[i + 1] & 0xF) << 2) | ((int) (data[i + 2] &
0xC0) >> 6)];
+ *p++ = base64_table[data[i + 2] & 0x3F];
+ }
+ if (i < in_len) {
+ *p++ = base64_table[(data[i] >> 2) & 0x3F];
+ if (i == (in_len - 1)) {
+ *p++ = base64_table[((data[i] & 0x3) << 4)];
+ *p++ = '=';
+ }
+ else {
+ *p++ = base64_table[((data[i] & 0x3) << 4) | ((int) (data[i + 1] &
0xF0) >> 4)];
+ *p++ = base64_table[((data[i + 1] & 0xF) << 2)];
+ }
+ *p++ = '=';
+ }
+ return ret;
+}
+
+std::string Utils::genBasicAuthCredential(const std::string& id, const
std::string& key)
+{
+ std::string credential = id + constants::kBasicAuthJoiner + key;
+ return constants::kBasicAuthPrefix + constants::kBasicAuthSeparator +
base64_encode(credential);
+}
+
+int32_t Utils::requestUrl(std::string& res, const HttpRequest* request)
+{
+ CURL* curl = NULL;
+ struct curl_slist* list = NULL;
+
+ curl_global_init(CURL_GLOBAL_ALL);
+
+ curl = curl_easy_init();
+ if (!curl)
+ {
+ LOG_ERROR("failed to init curl object");
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ // http header
+ list = curl_slist_append(list, "Content-Type:
application/x-www-form-urlencoded");
+ if ( request->need_auth && !request->auth_id.empty() &&
!request->auth_key.empty())
+ {
+ // Authorization: Basic xxxxxxxx
+ std::string auth = constants::kBasicAuthHeader +
constants::kBasicAuthSeparator + genBasicAuthCredential(request->auth_id,
request->auth_key);
+ LOG_INFO("request manager, auth-header:%s", auth.c_str());
+ list = curl_slist_append(list, auth.c_str());
+ }
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
+
+ //set url
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
+ curl_easy_setopt(curl, CURLOPT_URL, request->url.c_str());
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request->post_data.c_str());
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, request->timeout);
+
+ //register callback and get res
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Utils::getUrlResponse);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
+ curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
+
+ //execute curl request
+ CURLcode ret = curl_easy_perform(curl);
+ if (ret != 0)
+ {
+ LOG_ERROR("%s", curl_easy_strerror(ret));
+ LOG_ERROR("failed to request data from %s", request->url.c_str());
+ if (curl) curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ int32_t code;
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
+ if (code != 200)
+ {
+ LOG_ERROR("tdm responsed with code %d", code);
+ if (curl) curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ if (res.empty())
+ {
+ LOG_ERROR("tdm return empty data");
+ if (curl) curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ //clean work
+ curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return 0;
+}
+
+size_t Utils::getUrlResponse(void* buffer, size_t size, size_t count, void*
response)
+{
+ std::string* str = (std::string*)response;
+ (*str).append((char*)buffer, size * count);
+
+ return size * count;
+}
+
+bool Utils::readFile(const std::string& file_path, std::string& content)
+{
+ std::ifstream f(file_path.c_str());
+ if (f.fail())
+ {
+ LOG_ERROR("fail to read file:%s, please check file path",
file_path.c_str());
+ return false;
+ }
+ std::stringstream ss;
+ ss << f.rdbuf();
+ content = ss.str();
+ return true;
+}
+
+static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
+
+std::string Utils::trim(const std::string& source)
+{
+ std::string target = source;
+ if (!target.empty())
+ {
+ size_t foud_pos = target.find_first_not_of(kWhitespaceCharSet);
+ if (foud_pos != std::string::npos) { target = target.substr(foud_pos);
}
+ foud_pos = target.find_last_not_of(kWhitespaceCharSet);
+ if (foud_pos != std::string::npos) { target = target.substr(0,
foud_pos + 1); }
+ }
+ return target;
+}
+
+int32_t Utils::splitOperate(const std::string& source,
std::vector<std::string>& result, const std::string& delimiter)
+{
+ std::string item_str;
+ std::string::size_type pos1 = 0;
+ std::string::size_type pos2 = 0;
+ result.clear();
+ if (!source.empty())
+ {
+ pos1 = 0;
+ pos2 = source.find(delimiter);
+ while (std::string::npos != pos2)
+ {
+ item_str = trim(source.substr(pos1, pos2 - pos1));
+ pos1 = pos2 + delimiter.size();
+ pos2 = source.find(delimiter, pos1);
+ if (!item_str.empty()) { result.push_back(item_str); }
+ }
+ if (pos1 != source.length())
+ {
+ item_str = trim(source.substr(pos1));
+ if (!item_str.empty()) { result.push_back(item_str); }
+ }
+ }
+ return result.size();
+}
+
+std::string Utils::getVectorStr(std::vector<std::string>& vs)
+{
+ std::string res;
+ for (auto& it : vs)
+ {
+ res += it + ", ";
+ }
+ return res;
+}
+
+} // namespace dataproxy_sdk
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h
new file mode 100644
index 000000000..e33f0ce59
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/utils.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_UTILS_H_
+#define DATAPROXY_SDK_BASE_UTILS_H_
+
+#include <snappy.h>
+#include <stdint.h>
+#include <string>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <utility>
+#include <vector>
+namespace dataproxy_sdk
+{
+using PAIR = std::pair<std::string, int32_t>;
+struct HttpRequest
+{
+ std::string url;
+ uint32_t timeout;
+ bool need_auth;
+ std::string auth_id;
+ std::string auth_key;
+ std::string post_data;
+
+};
+
+class Utils
+{
+ private:
+ static char snowflake_id[35];
+ static uint16_t sequence;
+ static uint64_t last_msstamp;
+
+ public:
+ static void taskWaitTime(int32_t sec);
+ static uint64_t getCurrentMsTime();
+ static uint64_t getCurrentWsTime();
+ static std::string getFormatTime(uint64_t date_time);
//format time: yyyymmddHHMMSS
+ static size_t zipData(const char* input, uint32_t input_len, std::string&
zip_res); //snappy data
+ static char* getSnowflakeId();
//get 64bit snowflakeId
+ static bool getFirstIpAddr(std::string& local_host);
+ inline static bool isLegalTime(uint64_t report_time)
+ {
+ return ((report_time > 1435101567000LL) && (report_time <
4103101567000LL));
+ }
+ static bool bindCPU(int32_t cpu_id);
+ static std::string base64_encode(const std::string& data);
+ static std::string genBasicAuthCredential(const std::string& id, const
std::string& key);
+ static int32_t requestUrl(std::string& res, const HttpRequest* request);
+ static bool readFile(const std::string& file_path, std::string& content);
//read file content, save as res, return true is success
+ static int32_t splitOperate(const std::string& source,
std::vector<std::string>& result, const std::string& delimiter);
+ static std::string getVectorStr(std::vector<std::string>& vs);
+
+ static bool upValueSort(const PAIR& lhs, const PAIR& rhs) { return
lhs.second < rhs.second; }
+ static bool downValueSort(const PAIR& lhs, const PAIR& rhs) { return
lhs.second > rhs.second; }
+
+ private:
+ static size_t getUrlResponse(void* buffer, size_t size, size_t count,
void* response);
+ static int64_t waitNextMills(int64_t last_ms);
+ static std::string trim(const std::string& source);
+};
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_UTILS_H_
\ No newline at end of file