This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2f147c8c7 [INLONG-5258][SDK] DataProxy SDK(cpp) core interface (#5260)
2f147c8c7 is described below
commit 2f147c8c72fe4da7ae4781c7e32c7fcbeeb3e47d
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 22:50:31 2022 +0800
[INLONG-5258][SDK] DataProxy SDK(cpp) core interface (#5260)
---
.../dataproxy-sdk-cpp/release/inc/tc_api.h | 109 ++++++++
.../dataproxy-sdk-cpp/src/base/sdk_core.cc | 298 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/base/sdk_core.h | 65 +++++
.../dataproxy-sdk-cpp/src/base/user_msg.h | 63 +++++
4 files changed, 535 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
new file mode 100644
index 000000000..7cd1738cd
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CAPI_RELEASE_INC_TC_API_H_
+#define CAPI_RELEASE_INC_TC_API_H_
+
+#include "sdk_core.h"
+#include <stdint.h>
+
+namespace dataproxy_sdk
+{
+// send result
+enum SDKInvalidResult {
+ kMultiInit = 4, // invoke tc_api_init repeatedly in the same process
+ kErrorInit = 5, // tc_api_init error, user should check config file
+ kMsgTooLong = 6, // msg len is more than ext_pack_size
+ kInvalidInput = 7, // empty groupId or streamId or msg body
+ kBufferPoolFull = 8, // sdk buffer is full, should adjust sending
frequency, or increase max_buf_pool/thread_num in config file
+ kFailGetConn = 9, // fail to get proxy connection, check whether
groupId or proxy_cfg_preurl is correct
+ kSendAfterClose = 10, // invoke send function after closing sdk
+ kMultiExits = 11, // invoke tc_api_close function repeatedly
+ kInvalidGroupId =12, // groupId is not set in config
+ kFailGetBufferPool = 13,
+ kFailGetSendBuf = 14,
+ kFailWriteToBuf = 15,
+ kErrorCURL = 16, // request manager error
+ kErrorParseJson = 17,
+ kFailGetPackQueue = 18 // failed to get pack queue
+
+};
+
+/**
+ * @description: Before using sdk to send data, tc_api_init must be invoked
+ * @return 0 if success
+ * @param {char*} config_file, user configfile, prefer using absolute path
+ */
+int32_t tc_api_init(const char* config_file);
+
+/**
+ * @description: tc_api_init ext function
+ * @return 0 if success
+ * @param {char*} config_file - user configfile, prefer using absolute path
+ * @param {int32_t} use_def - is use_def isn't 0,
+ */
+int32_t tc_api_init_ext(const char* config_file, int32_t use_def);
+
+
+/**
+ * @description: send data
+ * @return {*} 0 if success; non-zero means this send failed, refer to
SDKInvalidResult above
+ * @param {char*} inlong_group_id
+ * @param {char*} inlong_stream_id
+ * @param {char*} msg
+ * @param {int32_t} msg_len
+ * @param {UserCallBack} call_back
+ */
+/*
+ * UserCallBack function signature:
+ * return: int32_t
+ * parameters: (const char* inlong_group_id, const char* inlong_stream_id,
const char* msg, int32_t msg_len, const int64_t report_time, const char*
client_ip)
+ */
+int32_t tc_api_send(const char* inlong_group_id, const char* inlong_stream_id,
const char* msg, int32_t msg_len, UserCallBack call_back = NULL);
+
+int32_t tc_api_send_batch(const char* inlong_group_id, const char*
inlong_stream_id, const char** msg_list, int32_t msg_cnt, UserCallBack
call_back = NULL);
+
+
+/**
+ * @description: send data, add msg_time based on tc_api_send
+ * @return 0 if success
+ */
+int32_t tc_api_send_ext(const char* inlong_group_id,
+ const char* inlong_stream_id,
+ const char* msg,
+ int32_t msg_len,
+ const int64_t msg_time,
+ UserCallBack call_back = NULL);
+
+int32_t tc_api_send_base(const char* inlong_group_id,
+ const char* inlong_stream_id,
+ const char* msg,
+ int32_t msg_len,
+ const int64_t report_time,
+ const char* client_ip,
+ UserCallBack call_back = NULL);
+
+/**
+ * @description: close sdk; if sdk is closed, you can't send data any more
+ * @return 0 if success
+ * @param {int32_t} max_waitms, millisecond, waiting data in memory to be sent
+ */
+int32_t tc_api_close(int32_t max_waitms);
+
+} // namespace dataproxy_sdk
+
+#endif // CAPI_RELEASE_INC_TC_API_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
new file mode 100644
index 000000000..0996b9b0f
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tc_api.h"
+
+#include <chrono>
+#include <rapidjson/document.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string>
+#include <unistd.h>
+#include <unordered_map>
+#include <vector>
+#include <algorithm>
+
+#include "atomic.h"
+#include "buffer_pool.h"
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "client_config.h"
+#include "logger.h"
+#include "pack_queue.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+ AtomicInt init_flag{0};
+ AtomicInt user_exit_flag{0};
+ AtomicUInt g_send_msgid{0}; // msg uuid
+ AtomicInt g_buf_full{0}; // used for buf full log limit
+ ClientConfig *g_config = nullptr;
+ GlobalCluster *g_clusters = nullptr;
+ GlobalQueues *g_queues = nullptr;
+ TotalPools *g_pools = nullptr;
+ ExecutorThreadPool *g_executors = nullptr;
+ int32_t g_use_default = 0; // whether use default config to init sdk if
user's config file error
+
+ int32_t tc_api_init(const char *config_file)
+ {
+ getLogger().init(5, 15, Logger::Level(3), 2, true, "./", ".cpplog");
+
+ // one process is only initialized once
+ if (!init_flag.compareAndSwap(0, 1))
+ {
+ LOG_ERROR("dataproxy_sdk_cpp has been initialized before!");
+ return SDKInvalidResult::kMultiInit;
+ }
+ user_exit_flag.getAndSet(0);
+
+ g_config = new ClientConfig(config_file);
+
+ if (!g_config){
+ LOG_ERROR("dataproxy_sdk_cpp init error");
+ return SDKInvalidResult::kErrorInit;
+ }
+ bool res = g_config->parseConfig();
+
+ if (!res){
+ // init error and not allow default init
+ if (g_use_default)
+ {
+ g_config->defaultInit();
+ }
+ else
+ {
+ LOG_ERROR("dataproxy_sdk_cpp init error");
+ return SDKInvalidResult::kErrorInit;
+ }
+ }
+
+ remove("./.cpplog");
+
+ getLogger().init(g_config->log_size_, g_config->log_num_,
Logger::Level(g_config->log_level_), g_config->log_file_type_,
+ g_config->log_enable_limit_, g_config->log_path_);
+
+ LOG_WARN("dataproxy_sdk_cpp start init, config path:%s, version:%s",
config_file, constants::kTDBusCAPIVersion);
+ g_config->showClientConfig();
+
+ // get local ip
+ if (!Utils::getFirstIpAddr(g_config->ser_ip_))
+ {
+ LOG_WARN("not found the localHost in local OS, use user's
ser_ip(%s) in config", g_config->ser_ip_.c_str());
+ }
+
+ if (g_config->enable_setaffinity_)
+ {
+ Utils::bindCPU(g_config->mask_cpu_affinity_);
+ }
+
+ signal(SIGPIPE, SIG_IGN);
+
+ g_pools = new TotalPools();
+ if (!g_pools)
+ {
+ LOG_ERROR("fail to init global buffer pools");
+ return SDKInvalidResult::kErrorInit;
+ }
+ LOG_INFO("buf pools init complete");
+
+ g_clusters = new GlobalCluster();
+ if (!g_clusters)
+ {
+ LOG_ERROR("fail to init global clusterlist");
+ return SDKInvalidResult::kErrorInit;
+ }
+ LOG_INFO("global clusterlist init complete");
+
+ g_queues = new GlobalQueues();
+ if (!g_queues)
+ {
+ LOG_ERROR("fail to init global packqueue");
+ return SDKInvalidResult::kErrorInit;
+ }
+ LOG_INFO("global packqueue init complete");
+
+ // init network threadpools
+ g_executors = new ExecutorThreadPool();
+ if (!g_executors)
+ {
+ LOG_ERROR("fail to init network threads");
+ return SDKInvalidResult::kErrorInit;
+ }
+
+ LOG_INFO("read cache proxylist for disaster tolerance");
+ g_clusters->readCacheBuslist();
+
+ if (!g_config->inlong_group_ids_.empty())
+ {
+ int32_t ret = g_clusters->initBuslistAndCreateConns();
+ // FIXME: improve, return ret to user?
+ }
+
+ // packqueue flush thread
+ g_queues->startCheckSubroutine();
+
+ // proxylist update thread
+ g_clusters->startUpdateSubroutine();
+
+ LOG_WARN("dataproxy_sdk_cpp init complete!");
+
+ return 0;
+ }
+
+ int32_t tc_api_init_ext(const char *config_file, int32_t use_def)
+ {
+ g_use_default = use_def;
+ return tc_api_init(config_file);
+ }
+
+ int32_t sendBaseMsg(const std::string msg,
+ const std::string inlong_group_id,
+ const std::string inlong_stream_id,
+ const std::string client_ip,
+ int64_t report_time,
+ UserCallBack call_back)
+ {
+ // should init first, and close before
+ if (init_flag.get() == 0 || user_exit_flag.get() == 1)
+ {
+ LOG_ERROR("capi has been closed, init first and then send");
+ return SDKInvalidResult::kSendAfterClose;
+ }
+
+ // input check
+ if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty())
+ {
+ LOG_ERROR("invalid input, inlong_group_id:%s, inlong_stream_id:%s,
msg:%s", inlong_group_id.c_str(), inlong_stream_id.c_str(), msg.c_str());
+ return SDKInvalidResult::kInvalidInput;
+ }
+
+ // msg len check
+ if (msg.size() > g_config->ext_pack_size_)
+ {
+ LOG_ERROR("msg len is too long, cur msg_len:%d, ext_pack_size:%d",
msg.size(), g_config->ext_pack_size_);
+ return SDKInvalidResult::kMsgTooLong;
+ }
+
+ if(g_config->enable_groupId_isolation_){
+
if(std::find(g_config->inlong_group_ids_.begin(),g_config->inlong_group_ids_.end(),inlong_group_id)==g_config->inlong_group_ids_.end()){
+ LOG_ERROR("inlong_group_id:%s is not specified in config file,
check it", inlong_group_id.c_str());
+ return SDKInvalidResult::kInvalidGroupId;
+ }
+ }
+
+ g_clusters->addBuslist(inlong_group_id);
+
+ if (!g_pools->isPoolAvailable(inlong_group_id))
+ {
+ if (g_buf_full.get() == 0)
+ LOG_ERROR("buf pool is full, send later, datalen:%d",
msg.size());
+ g_buf_full.increment();
+ if (g_buf_full.get() > 100)
+ {
+ g_buf_full.getAndSet(1);
+ LOG_ERROR("buf pool is full 100 times, send later,
datalen:%d", msg.size());
+ }
+
+ return SDKInvalidResult::kBufferPoolFull;
+ }
+
+ //get packqueue
+ auto pack_queue = g_queues->getPackQueue(inlong_group_id,
inlong_stream_id);
+ if (!pack_queue)
+ {
+ LOG_ERROR("fail to get pack queue, inlong_group_id:%s,
inlong_stream_id:%s", inlong_group_id.c_str(), inlong_stream_id.c_str());
+ return SDKInvalidResult::kFailGetPackQueue;
+ }
+
+ return pack_queue->sendMsg(msg, inlong_group_id, inlong_stream_id,
client_ip, report_time, call_back);
+ }
+
+ int32_t tc_api_send(const char *proxyiness_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back)
+ {
+ int64_t msg_time = Utils::getCurrentMsTime();
+ return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, "", msg_time,
call_back);
+ }
+
+ int32_t tc_api_send_batch(const char *proxyiness_id, const char
*inlong_stream_id, const char **msg_list, int32_t msg_cnt, UserCallBack
call_back)
+ {
+ if (!msg_cnt)
+ return SDKInvalidResult::kInvalidInput;
+ int64_t msg_time = Utils::getCurrentMsTime();
+ int32_t ret = 0;
+ for (size_t i = 0; i < msg_cnt; i++)
+ {
+ ret = sendBaseMsg(msg_list[i], proxyiness_id, inlong_stream_id,
"", msg_time, call_back);
+ if (ret)
+ return ret;
+ }
+ return 0;
+ }
+
+ int32_t tc_api_send_ext(const char *proxyiness_id, const char
*inlong_stream_id, const char *msg, int32_t msg_len, const int64_t msg_time,
UserCallBack call_back)
+ {
+ return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, "", msg_time,
call_back);
+ }
+
+ int32_t tc_api_send_base(const char *proxyiness_id,
+ const char *inlong_stream_id,
+ const char *msg,
+ int32_t msg_len,
+ const int64_t report_time,
+ const char *client_ip,
+ UserCallBack call_back)
+ {
+ return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, client_ip,
report_time, call_back);
+ }
+
+ int32_t tc_api_close(int32_t max_waitms)
+ {
+ // only exit once
+ if (!init_flag.compareAndSwap(1, 0))
+ {
+ LOG_ERROR("dataproxy_sdk_cpp has been closed!");
+ return SDKInvalidResult::kMultiExits;
+ }
+
+ user_exit_flag.getAndSet(1);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
+
+ g_queues->printTotalAck(); // pring ack msg count of each
groupid+streamid
+
+ delete g_queues;
+ g_queues=nullptr;
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ delete g_executors;
+ g_executors=nullptr;
+ delete g_pools;
+ g_pools=nullptr;
+ delete g_clusters;
+ g_clusters=nullptr;
+ delete g_config;
+ g_config=nullptr;
+
+ // std::this_thread::sleep_for(std::chrono::seconds(5));
+
+ LOG_WARN("close dataproxy_sdk_cpp!");
+
+ return 0;
+ }
+} // namespace dataproxy_sdk
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
new file mode 100644
index 000000000..79d3a2a33
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CORE_H_
+#define DATAPROXY_SDK_BASE_CORE_H_
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <asio.hpp>
+
+#include "atomic.h"
+#include "user_msg.h"
+
+// #include "buffer_pool.h"
+namespace dataproxy_sdk {
+class ClientConfig;
+class GlobalCluster;
+class GlobalQueues;
+class TotalPools;
+
+class ExecutorThread;
+using ExecutorThreadPtr = std::shared_ptr<ExecutorThread>;
+
+class ExecutorThreadPool;
+
+class Connection;
+using ConnectionPtr = std::shared_ptr<Connection>;
+
+class ProxyInfo;
+using ProxyInfoPtr = std::shared_ptr<ProxyInfo>;
+
+class SendBuffer;
+
+using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+using io_context_work =
asio::executor_work_guard<asio::io_context::executor_type>;
+
+extern AtomicUInt g_send_msgid;
+extern ClientConfig* g_config;
+extern GlobalCluster* g_clusters;
+extern GlobalQueues* g_queues;
+extern TotalPools* g_pools;
+extern ExecutorThreadPool* g_executors;
+extern AtomicInt user_exit_flag;
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_CORE_H_
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h
new file mode 100644
index 000000000..49dbea42e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_USER_MSG_H_
+#define DATAPROXY_SDK_BASE_USER_MSG_H_
+
+#include <memory>
+#include <stdint.h>
+#include <string>
+namespace dataproxy_sdk
+{
+// UserCallBack signature: inlong_group_id,inlong_stream_id,msg,msg_len,
report_time, client_ip
+using UserCallBack = std::function<int32_t(const char*, const char*, const
char*, int32_t, const int64_t, const char*)>;
+
+struct UserMsg
+{
+ std::string msg;
+ std::string client_ip;
+ int64_t report_time;
+ UserCallBack cb;
+
+ int64_t user_report_time;
+ std::string user_client_ip;
+
+ std::string data_pack_format_attr; //"__addcol1__reptime=" +
Utils::getFormatTime(data_time_) + "&__addcol2__ip=" + client_ip
+ UserMsg(const std::string& mmsg,
+ const std::string& mclient_ip,
+ int64_t mreport_time,
+ UserCallBack mcb,
+ const std::string& attr,
+ const std::string& u_ip,
+ int64_t u_time)
+ : msg(mmsg)
+ , client_ip(mclient_ip)
+ , report_time(mreport_time)
+ , cb(mcb)
+ , data_pack_format_attr(attr)
+ , user_client_ip(u_ip)
+ , user_report_time(u_time)
+ {
+ }
+};
+using UserMsgPtr = std::shared_ptr<UserMsg>;
+
+} // namespace dataproxy_sdk
+
+#endif // DATAPROXY_SDK_BASE_USER_MSG_H_
\ No newline at end of file