This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6f145dad18 [INLONG-8864][SDK] Add memory utils for dataproxy cpp sdk
(#8865)
6f145dad18 is described below
commit 6f145dad18db198e6a9a0b59dad61e9006f09330
Author: doleyzi <[email protected]>
AuthorDate: Sat Sep 9 18:05:42 2023 +0800
[INLONG-8864][SDK] Add memory utils for dataproxy cpp sdk (#8865)
Co-authored-by: doleyzi <[email protected]>
---
.../dataproxy-sdk-cpp/src/utils/block_memory.h | 66 +++
.../dataproxy-sdk-cpp/src/utils/send_buffer.h | 149 +++++++
.../dataproxy-sdk-cpp/src/utils/utils.cc | 442 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/utils/utils.h | 92 +++++
4 files changed, 749 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/block_memory.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/block_memory.h
new file mode 100644
index 0000000000..8c83af5e2c
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/block_memory.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_BLOCK_MEMORY_H
+#define INLONG_SDK_BLOCK_MEMORY_H
+
+#include "memory.h"
+#include "noncopyable.h"
+#include <memory>
+#include <string>
+namespace inlong {
+
+class BlockMemory : noncopyable {
+public:
+ explicit BlockMemory(const uint64_t size = 32 * 1024,
+ const std::string &id = "")
+ : m_data(new char[size]), m_len(0), m_max_size(size), id_(id) {}
+
+ ~BlockMemory() {
+ if (m_data) {
+ delete[] m_data;
+ }
+ };
+
+ void CopyFrom(const BlockMemory &other);
+
+ uint64_t GetFree();
+
+public:
+ char *m_data;
+ uint64_t m_len;
+ const uint64_t m_max_size;
+ const std::string id_;
+};
+typedef std::shared_ptr<BlockMemory> BlockMemoryPtrT;
+
+inline void BlockMemory::CopyFrom(const BlockMemory &other) {
+ if (this == &other) {
+ return;
+ }
+
+ if (m_max_size >= other.m_len) {
+ memcpy(m_data, other.m_data, static_cast<size_t>(other.m_len));
+ m_len = other.m_len;
+ }
+}
+
+inline uint64_t BlockMemory::GetFree() { return (m_max_size - m_len); }
+} // namespace inlong
+#endif // INLONG_SDK_BLOCK_MEMORY_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
new file mode 100644
index 0000000000..4aa2f55cf2
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_SEND_BUFFER_H
+#define INLONG_SDK_SEND_BUFFER_H
+
+#include <mutex>
+#include <string>
+
+#include "sdk_core.h"
+// #include "executor_thread_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+// #include "socket_connection.h"
+
+namespace dataproxy_sdk {
+class SendBuffer : noncopyable {
+private:
+ uint32_t uniq_id_;
+ bool is_used_;
+ bool is_packed_; // is packed completed
+ char *content_; // send data
+ uint32_t size_; // buf_size
+ int32_t msg_cnt_;
+ uint32_t len_; // send data len
+ std::string inlong_group_id_;
+ std::string inlong_stream_id_;
+ AtomicInt already_send_;
+ uint64_t first_send_time_; // ms
+ uint64_t latest_send_time_; // ms
+ ConnectionPtr target_; // send conn
+ std::vector<UserMsgPtr> user_msg_set_;
+
+public:
+ std::mutex mutex_;
+ SteadyTimerPtr timeout_timer_; // timeout, resend
+ AtomicInt fail_create_conn_; // create conn fail count
+
+public:
+ SendBuffer(uint32_t size)
+ : is_used_(false), is_packed_(false), mutex_(), msg_cnt_(0), len_(0),
+ inlong_group_id_(), inlong_stream_id_(), first_send_time_(0),
+ latest_send_time_(0), target_(nullptr), uniq_id_(0),
+ timeout_timer_(nullptr), size_(size) {
+ content_ = new char[size];
+ if (content_) {
+ memset(content_, 0x0, size);
+ }
+ }
+ ~SendBuffer() {
+ if (content_) {
+ delete[] content_;
+ }
+ content_ = nullptr;
+ }
+
+ char *content() { return content_; }
+ int32_t msgCnt() const { return msg_cnt_; }
+ void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; }
+ uint32_t len() { return len_; }
+ void setLen(const uint32_t len) { len_ = len; }
+ std::string inlong_group_id() { return inlong_group_id_; }
+ std::string inlong_stream_id() { return inlong_stream_id_; }
+ void setGroupid(const std::string &inlong_group_id) {
+ inlong_group_id_ = inlong_group_id;
+ }
+ void setStreamid(const std::string &inlong_stream_id) {
+ inlong_stream_id_ = inlong_stream_id;
+ }
+ uint64_t firstSendTime() const { return first_send_time_; }
+ void setFirstSendTime(const uint64_t &first_send_time) {
+ first_send_time_ = first_send_time;
+ }
+ uint64_t latestSendTime() const { return latest_send_time_; }
+ void setLatestSendTime(const uint64_t &latest_send_time) {
+ latest_send_time_ = latest_send_time;
+ }
+
+ ConnectionPtr target() const { return target_; }
+ void setTarget(ConnectionPtr &target) { target_ = target; }
+
+ inline void increaseRetryNum() { already_send_.increment(); }
+ inline int32_t getAlreadySend() { return already_send_.get(); }
+
+ uint32_t uniqId() const { return uniq_id_; }
+ void setUniqId(const uint32_t &uniq_id) { uniq_id_ = uniq_id; }
+
+ void addUserMsg(UserMsgPtr u_msg) { user_msg_set_.push_back(u_msg); }
+ void doUserCallBack() {
+ LOG_TRACE("failed to send msg, start user call_back");
+ for (auto it : user_msg_set_) {
+ if (it->cb) {
+ it->cb(inlong_group_id_.data(), inlong_stream_id_.data(),
+ it->msg.data(), it->msg.size(), it->user_report_time,
+ it->user_client_ip.data());
+ }
+ }
+ }
+
+ void reset() {
+ uint32_t record_uid = uniq_id_; // for debug
+
+ uniq_id_ = 0;
+ is_used_ = false;
+ is_packed_ = false;
+ memset(content_, 0x0, size_);
+ msg_cnt_ = 0;
+ len_ = 0;
+ inlong_group_id_ = "";
+ inlong_stream_id_ = "";
+ already_send_.getAndSet(0);
+ first_send_time_ = 0;
+ latest_send_time_ = 0;
+ target_ = nullptr;
+ if (timeout_timer_) {
+ timeout_timer_->cancel();
+ timeout_timer_ = nullptr;
+ }
+ user_msg_set_.clear();
+ fail_create_conn_.getAndSet(0);
+ LOG_TRACE("reset senfbuf(uid:%d) successfully", record_uid);
+ }
+
+ bool isPacked() const { return is_packed_; }
+ void setIsPacked(bool is_packed) { is_packed_ = is_packed; }
+
+ bool isUsed() const { return is_used_; }
+ void setIsUsed(bool is_used) { is_used_ = is_used; }
+};
+
+} // namespace dataproxy_sdk
+
+#endif // INLONG_SDK_SEND_BUFFER_H
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
new file mode 100644
index 0000000000..5a80740e11
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.cc
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "utils.h"
+
+#include <arpa/inet.h>
+#include <ctime>
+#include <errno.h>
+#include <fstream>
+#include <iostream>
+#include <iterator>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <regex>
+#include <sstream>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/sysinfo.h>
+#include <sys/time.h>
+
+#include "curl/curl.h"
+#include "logger.h"
+#include "tc_api.h"
+namespace inlong {
+uint16_t Utils::sequence = 0;
+uint64_t Utils::last_msstamp = 0;
+char Utils::snowflake_id[35] = {0};
+char base64_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K',
+ 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V',
+ 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g',
+ 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
+ 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2',
+ '3', '4', '5', '6', '7', '8', '9', '+', '/'};
+
+void Utils::taskWaitTime(int32_t sec) {
+ struct timeval tv;
+ tv.tv_sec = sec;
+ tv.tv_usec = 0;
+ int err;
+ do {
+ err = select(0, NULL, NULL, NULL, &tv);
+ } while (err < 0 && errno == EINTR);
+}
+
+uint64_t Utils::getCurrentMsTime() {
+ uint64_t ms_time = 0;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ms_time = ((uint64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000);
+ return ms_time;
+}
+uint64_t Utils::getCurrentWsTime() {
+ uint64_t ws_time = 0;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ws_time = ((uint64_t)tv.tv_sec * 1000000 + tv.tv_usec);
+ return ws_time;
+}
+
+std::string Utils::getFormatTime(uint64_t data_time) {
+ struct tm timeinfo;
+ char buffer[80];
+
+ // time(&rawtime);
+ time_t m_time = data_time / 1000;
+ localtime_r(&m_time, &timeinfo);
+
+ strftime(buffer, sizeof(buffer), "%Y%m%d%H%M%S", &timeinfo);
+
+ return std::string(buffer);
+}
+
+size_t Utils::zipData(const char *input, uint32_t input_len,
+ std::string &zip_res) {
+ size_t len_after_zip = snappy::Compress((char *)input, input_len, &zip_res);
+ LOG_TRACE("data zip: input len is %u, output len is %u.", input_len,
+ len_after_zip);
+ return len_after_zip;
+}
+
+char *Utils::getSnowflakeId() {
+ std::string local_host;
+ getFirstIpAddr(local_host);
+ uint64_t ipaddr = htonl(inet_addr(local_host.c_str()));
+ uint32_t pidid = static_cast<uint16_t>((getpid() & 0xFFFF));
+ uint32_t selfid = static_cast<uint16_t>((pthread_self() & 0xFFFF00) >> 8);
+
+ // 22bit ms
+ uint64_t sequence_mask = -1LL ^ (-1LL << 22);
+
+ uint64_t time_id = 0LL;
+ uint64_t local_id = (ipaddr << 32) | (pidid << 16) | (selfid);
+
+ uint64_t since_date = 1288834974657LL; // Thu, 04 Nov 2010 01:42:54 GMT
+
+ // 41bit ms
+ uint64_t msstamp = getCurrentMsTime();
+
+ uint64_t rand = 0;
+ uint64_t rand_mask = -1LL ^ (-1LL << (32 + 5 + 12));
+
+ // error timestap
+ if (msstamp < last_msstamp) {
+ LOG_ERROR("ms(%llx) time less last(%llx).", msstamp, last_msstamp);
+
+ // last ms
+ last_msstamp = msstamp;
+
+ srand(static_cast<uint32_t>(msstamp));
+ rand = random();
+
+ // generate id
+ time_id = ((msstamp - since_date) << 22 | (rand & rand_mask));
+
+ snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx",
+ local_id, time_id);
+ return &snowflake_id[0];
+ }
+
+ // increase id
+ if (last_msstamp == msstamp) {
+ sequence = (sequence + 1) & sequence_mask;
+
+ if (0 == sequence) {
+ msstamp = waitNextMills(last_msstamp);
+ }
+ } else {
+ sequence = 0;
+ }
+
+ last_msstamp = msstamp;
+
+ time_id = (((msstamp - since_date) << 22) | sequence);
+
+ LOG_TRACE("ms:0x%llx, ip:0x%.16llx, seq:0x:%x, selfid:%u, "
+ "local_id:0x%.16llx, time_id:0x%.16llx.",
+ (msstamp - since_date) << (22), ipaddr, sequence,
+ static_cast<uint32_t>(pthread_self()), local_id, time_id);
+
+ snprintf(&snowflake_id[0], sizeof(snowflake_id), "0x%.16llx%.16llx",
local_id,
+ time_id);
+ return &snowflake_id[0];
+}
+
+int64_t Utils::waitNextMills(int64_t last_ms) {
+ int64_t msstamp = getCurrentMsTime();
+
+ while (msstamp <= last_ms) {
+ msstamp = getCurrentMsTime();
+ }
+
+ return msstamp;
+}
+
+bool Utils::getFirstIpAddr(std::string &local_host) {
+ int32_t sockfd;
+ int32_t ip_num = 0;
+ char buf[1024] = {0};
+ struct ifreq *ifreq;
+ struct ifreq if_flag;
+ struct ifconf ifconf;
+
+ ifconf.ifc_len = sizeof(buf);
+ ifconf.ifc_buf = buf;
+ if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+ LOG_ERROR("open the local socket(AF_INET, SOCK_DGRAM) failure!");
+ return false;
+ }
+
+ ioctl(sockfd, SIOCGIFCONF, &ifconf);
+
+ ifreq = (struct ifreq *)buf;
+ ip_num = ifconf.ifc_len / sizeof(struct ifreq);
+ for (int32_t i = 0; i < ip_num; i++, ifreq++) {
+ // exclude ipv6 addr
+ if (ifreq->ifr_flags != AF_INET) {
+ continue;
+ }
+
+ if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
+ continue;
+ }
+
+ memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0], sizeof(ifreq->ifr_name));
+
+ if ((ioctl(sockfd, SIOCGIFFLAGS, (char *)&if_flag)) < 0) {
+ continue;
+ }
+
+ if ((if_flag.ifr_flags & IFF_LOOPBACK) || !(if_flag.ifr_flags & IFF_UP)) {
+ continue;
+ }
+
+ if (!strncmp(
+ inet_ntoa(((struct sockaddr_in *)&(ifreq->ifr_addr))->sin_addr),
+ "127.0.0.1", 7)) {
+ continue;
+ }
+
+ local_host =
+ inet_ntoa(((struct sockaddr_in *)&(ifreq->ifr_addr))->sin_addr);
+ close(sockfd);
+ return true;
+ }
+ close(sockfd);
+ // local_host = "127.0.0.1";
+ return false;
+}
+
+bool Utils::bindCPU(int32_t cpu_id) {
+ int32_t cpunum = get_nprocs();
+ int32_t cpucore = cpu_id;
+ cpu_set_t mask;
+
+ if (abs(cpu_id) > cpunum) {
+ LOG_ERROR("mask<%d> more than total cpu num<%d>.", cpu_id, cpunum);
+ return false;
+ }
+
+ if (cpu_id < 0) {
+ cpucore = cpunum + cpu_id;
+ }
+
+ CPU_ZERO(&mask);
+ CPU_SET(cpucore, &mask);
+
+ if (sched_setaffinity(0, sizeof(mask), &mask) < 0) {
+ LOG_ERROR("set CPU affinity<%d>/<%d> errno<%d>", cpu_id, cpunum, errno);
+ }
+
+ return true;
+}
+
+std::string Utils::base64_encode(const std::string &data) {
+ size_t in_len = data.size();
+ size_t out_len = 4 * ((in_len + 2) / 3);
+ std::string ret(out_len, '\0');
+ size_t i;
+ char *p = const_cast<char *>(ret.c_str());
+
+ for (i = 0; i < in_len - 2; i += 3) {
+ *p++ = base64_table[(data[i] >> 2) & 0x3F];
+ *p++ =
+ base64_table[((data[i] & 0x3) << 4) | ((int)(data[i + 1] & 0xF0) >>
4)];
+ *p++ = base64_table[((data[i + 1] & 0xF) << 2) |
+ ((int)(data[i + 2] & 0xC0) >> 6)];
+ *p++ = base64_table[data[i + 2] & 0x3F];
+ }
+ if (i < in_len) {
+ *p++ = base64_table[(data[i] >> 2) & 0x3F];
+ if (i == (in_len - 1)) {
+ *p++ = base64_table[((data[i] & 0x3) << 4)];
+ *p++ = '=';
+ } else {
+ *p++ = base64_table[((data[i] & 0x3) << 4) |
+ ((int)(data[i + 1] & 0xF0) >> 4)];
+ *p++ = base64_table[((data[i + 1] & 0xF) << 2)];
+ }
+ *p++ = '=';
+ }
+ return ret;
+}
+
+std::string Utils::genBasicAuthCredential(const std::string &id,
+ const std::string &key) {
+ std::string credential = id + constants::kBasicAuthJoiner + key;
+ std::string result = constants::kBasicAuthPrefix;
+ result.append(constants::kBasicAuthSeparator);
+ result.append(base64_encode(credential));
+ return result;
+}
+
+int32_t Utils::requestUrl(std::string &res, const HttpRequest *request) {
+ CURL *curl = NULL;
+ struct curl_slist *list = NULL;
+
+ curl_global_init(CURL_GLOBAL_ALL);
+
+ curl = curl_easy_init();
+ if (!curl) {
+ LOG_ERROR("failed to init curl object");
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ // http header
+ list = curl_slist_append(list,
+ "Content-Type: application/x-www-form-urlencoded");
+ if (request->need_auth && !request->auth_id.empty() &&
+ !request->auth_key.empty()) {
+ // Authorization: Basic xxxxxxxx
+ std::string auth = constants::kBasicAuthHeader;
+ auth.append(constants::kBasicAuthSeparator);
+ auth.append(genBasicAuthCredential(request->auth_id, request->auth_key));
+ LOG_INFO("request manager, auth-header:%s", auth.c_str());
+ list = curl_slist_append(list, auth.c_str());
+ }
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
+
+ // set url
+ curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
+ curl_easy_setopt(curl, CURLOPT_URL, request->url.c_str());
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, request->post_data.c_str());
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, request->timeout);
+
+ // register callback and get res
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &Utils::getUrlResponse);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
+ curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
+
+ // execute curl request
+ CURLcode ret = curl_easy_perform(curl);
+ if (ret != 0) {
+ LOG_ERROR("%s", curl_easy_strerror(ret));
+ LOG_ERROR("failed to request data from %s", request->url.c_str());
+ if (curl)
+ curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ int32_t code;
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &code);
+ if (code != 200) {
+ LOG_ERROR("tdm responsed with code %d", code);
+ if (curl)
+ curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ if (res.empty()) {
+ LOG_ERROR("tdm return empty data");
+ if (curl)
+ curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return SDKInvalidResult::kErrorCURL;
+ }
+
+ // clean work
+ curl_easy_cleanup(curl);
+ curl_global_cleanup();
+
+ return 0;
+}
+
+size_t Utils::getUrlResponse(void *buffer, size_t size, size_t count,
+ void *response) {
+ std::string *str = (std::string *)response;
+ (*str).append((char *)buffer, size * count);
+
+ return size * count;
+}
+
+bool Utils::readFile(const std::string &file_path, std::string &content) {
+ std::ifstream f(file_path.c_str());
+ if (f.fail()) {
+ LOG_ERROR("fail to read file:%s, please check file path",
+ file_path.c_str());
+ return false;
+ }
+ std::stringstream ss;
+ ss << f.rdbuf();
+ content = ss.str();
+ return true;
+}
+
+static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
+
+std::string Utils::trim(const std::string &source) {
+ std::string target = source;
+ if (!target.empty()) {
+ size_t foud_pos = target.find_first_not_of(kWhitespaceCharSet);
+ if (foud_pos != std::string::npos) {
+ target = target.substr(foud_pos);
+ }
+ foud_pos = target.find_last_not_of(kWhitespaceCharSet);
+ if (foud_pos != std::string::npos) {
+ target = target.substr(0, foud_pos + 1);
+ }
+ }
+ return target;
+}
+
+int32_t Utils::splitOperate(const std::string &source,
+ std::vector<std::string> &result,
+ const std::string &delimiter) {
+ std::string item_str;
+ std::string::size_type pos1 = 0;
+ std::string::size_type pos2 = 0;
+ result.clear();
+ if (!source.empty()) {
+ pos1 = 0;
+ pos2 = source.find(delimiter);
+ while (std::string::npos != pos2) {
+ item_str = trim(source.substr(pos1, pos2 - pos1));
+ pos1 = pos2 + delimiter.size();
+ pos2 = source.find(delimiter, pos1);
+ if (!item_str.empty()) {
+ result.push_back(item_str);
+ }
+ }
+ if (pos1 != source.length()) {
+ item_str = trim(source.substr(pos1));
+ if (!item_str.empty()) {
+ result.push_back(item_str);
+ }
+ }
+ }
+ return result.size();
+}
+
+std::string Utils::getVectorStr(std::vector<std::string> &vs) {
+ std::string res;
+ for (auto &it : vs) {
+ res += it + ", ";
+ }
+ return res;
+}
+
+} // namespace inlong
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.h
new file mode 100644
index 0000000000..4ee96262a0
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/utils.h
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INLONG_SDK_UTILS_H
+#define INLONG_SDK_UTILS_H
+
+#include <stdint.h>
+#include <string>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <utility>
+#include <vector>
+
+#include "snappy.h"
+namespace dataproxy_sdk {
+using PAIR = std::pair<std::string, int32_t>;
+struct HttpRequest {
+ std::string url;
+ uint32_t timeout;
+ bool need_auth;
+ std::string auth_id;
+ std::string auth_key;
+ std::string post_data;
+};
+
+class Utils {
+private:
+ static char snowflake_id[35];
+ static uint16_t sequence;
+ static uint64_t last_msstamp;
+
+public:
+ static void taskWaitTime(int32_t sec);
+ static uint64_t getCurrentMsTime();
+ static uint64_t getCurrentWsTime();
+ static std::string
+ getFormatTime(uint64_t date_time); // format time: yyyymmddHHMMSS
+ static size_t zipData(const char *input, uint32_t input_len,
+ std::string &zip_res); // snappy data
+ static char *getSnowflakeId(); // get 64bit snowflakeId
+ static bool getFirstIpAddr(std::string &local_host);
+ inline static bool isLegalTime(uint64_t report_time) {
+ return ((report_time > 1435101567000LL) && (report_time <
4103101567000LL));
+ }
+ static bool bindCPU(int32_t cpu_id);
+ static std::string base64_encode(const std::string &data);
+ static std::string genBasicAuthCredential(const std::string &id,
+ const std::string &key);
+ static int32_t requestUrl(std::string &res, const HttpRequest *request);
+ static bool readFile(const std::string &file_path,
+ std::string &content); // read file content, save as
res,
+ // return true is success
+ static int32_t splitOperate(const std::string &source,
+ std::vector<std::string> &result,
+ const std::string &delimiter);
+ static std::string getVectorStr(std::vector<std::string> &vs);
+
+ static bool upValueSort(const PAIR &lhs, const PAIR &rhs) {
+ return lhs.second < rhs.second;
+ }
+ static bool downValueSort(const PAIR &lhs, const PAIR &rhs) {
+ return lhs.second > rhs.second;
+ }
+
+private:
+ static size_t getUrlResponse(void *buffer, size_t size, size_t count,
+ void *response);
+ static int64_t waitNextMills(int64_t last_ms);
+ static std::string trim(const std::string &source);
+};
+
+} // namespace dataproxy_sdk
+
+#endif // INLONG_SDK_UTILS_H
\ No newline at end of file