This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f147c8c7 [INLONG-5258][SDK] DataProxy SDK(cpp) core interface (#5260)
2f147c8c7 is described below

commit 2f147c8c72fe4da7ae4781c7e32c7fcbeeb3e47d
Author: xueyingzhang <[email protected]>
AuthorDate: Fri Jul 29 22:50:31 2022 +0800

    [INLONG-5258][SDK] DataProxy SDK(cpp) core interface (#5260)
---
 .../dataproxy-sdk-cpp/release/inc/tc_api.h         | 109 ++++++++
 .../dataproxy-sdk-cpp/src/base/sdk_core.cc         | 298 +++++++++++++++++++++
 .../dataproxy-sdk-cpp/src/base/sdk_core.h          |  65 +++++
 .../dataproxy-sdk-cpp/src/base/user_msg.h          |  63 +++++
 4 files changed, 535 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
new file mode 100644
index 000000000..7cd1738cd
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/tc_api.h
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CAPI_RELEASE_INC_TC_API_H_
+#define CAPI_RELEASE_INC_TC_API_H_
+
+#include "sdk_core.h"
+#include <stdint.h>
+
+namespace dataproxy_sdk
+{
+// send result
+enum SDKInvalidResult {
+    kMultiInit    = 4,  // invoke tc_api_init repeatedly in the same process
+    kErrorInit    = 5,  // tc_api_init error, user should check config file
+    kMsgTooLong   = 6,  // msg len is more than ext_pack_size
+    kInvalidInput = 7,  // empty groupId or streamId or msg body
+    kBufferPoolFull = 8,  // sdk buffer is full, should adjust sending 
frequency, or increase max_buf_pool/thread_num in config file
+    kFailGetConn       = 9,   // fail to get proxy connection, check whether 
groupId or proxy_cfg_preurl is correct
+    kSendAfterClose    = 10,  // invoke send function after closing sdk
+    kMultiExits        = 11,  // invoke tc_api_close function repeatedly
+    kInvalidGroupId =12, // groupId is not set in config
+    kFailGetBufferPool = 13,
+    kFailGetSendBuf    = 14,
+    kFailWriteToBuf    = 15,
+    kErrorCURL         = 16,  // request manager error
+    kErrorParseJson    = 17,
+    kFailGetPackQueue  = 18  // failed to get pack queue
+
+};
+
+/**
+ * @description: Before using sdk to send data, tc_api_init must be invoked
+ * @return 0 if success
+ * @param {char*} config_file, user configfile, prefer using absolute path
+ */
+int32_t tc_api_init(const char* config_file);
+
+/**
+ * @description: tc_api_init ext function
+ * @return 0 if success
+ * @param {char*} config_file - user configfile, prefer using absolute path
+ * @param {int32_t} use_def - is use_def isn't 0, 
+ */
+int32_t tc_api_init_ext(const char* config_file, int32_t use_def);
+
+
+/**
+ * @description: send data
+ * @return {*} 0 if success; non-zero means this send failed, refer to 
SDKInvalidResult above
+ * @param {char*} inlong_group_id
+ * @param {char*} inlong_stream_id
+ * @param {char*} msg
+ * @param {int32_t} msg_len
+ * @param {UserCallBack} call_back
+ */
+/*
+ * UserCallBack function signature:
+ *  return: int32_t
+ *  parameters: (const char* inlong_group_id, const char* inlong_stream_id, 
const char* msg, int32_t msg_len, const int64_t report_time, const char* 
client_ip)
+ */
+int32_t tc_api_send(const char* inlong_group_id, const char* inlong_stream_id, 
const char* msg, int32_t msg_len, UserCallBack call_back = NULL);
+
+int32_t tc_api_send_batch(const char* inlong_group_id, const char* 
inlong_stream_id, const char** msg_list, int32_t msg_cnt, UserCallBack 
call_back = NULL);
+
+
+/**
+ * @description: send data, add msg_time based on tc_api_send
+ * @return 0 if success
+ */
+int32_t tc_api_send_ext(const char* inlong_group_id,
+                        const char* inlong_stream_id,
+                        const char* msg,
+                        int32_t msg_len,
+                        const int64_t msg_time,
+                        UserCallBack call_back = NULL);
+
+int32_t tc_api_send_base(const char* inlong_group_id,
+                         const char* inlong_stream_id,
+                         const char* msg,
+                         int32_t msg_len,
+                         const int64_t report_time,
+                         const char* client_ip,
+                         UserCallBack call_back = NULL);
+
+/**
+ * @description: close sdk; if sdk is closed, you can't send data any more
+ * @return 0 if success
+ * @param {int32_t} max_waitms, millisecond, waiting data in memory to be sent
+ */
+int32_t tc_api_close(int32_t max_waitms);
+
+}  // namespace dataproxy_sdk
+
+#endif  // CAPI_RELEASE_INC_TC_API_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
new file mode 100644
index 000000000..0996b9b0f
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.cc
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tc_api.h"
+
+#include <chrono>
+#include <rapidjson/document.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string>
+#include <unistd.h>
+#include <unordered_map>
+#include <vector>
+#include <algorithm>
+
+#include "atomic.h"
+#include "buffer_pool.h"
+#include "proxylist_config.h"
+#include "sdk_constant.h"
+#include "client_config.h"
+#include "logger.h"
+#include "pack_queue.h"
+#include "utils.h"
+
+namespace dataproxy_sdk
+{
+    AtomicInt init_flag{0};
+    AtomicInt user_exit_flag{0};
+    AtomicUInt g_send_msgid{0}; // msg uuid
+    AtomicInt g_buf_full{0};    // used for buf full log limit
+    ClientConfig *g_config = nullptr;
+    GlobalCluster *g_clusters = nullptr;
+    GlobalQueues *g_queues = nullptr;
+    TotalPools *g_pools = nullptr;
+    ExecutorThreadPool *g_executors = nullptr;
+    int32_t g_use_default = 0; // whether use default config to init sdk if 
user's config file error
+
+    int32_t tc_api_init(const char *config_file)
+    {
+        getLogger().init(5, 15, Logger::Level(3), 2, true, "./", ".cpplog");
+
+        // one process is only initialized once
+        if (!init_flag.compareAndSwap(0, 1))
+        {
+            LOG_ERROR("dataproxy_sdk_cpp has been initialized before!");
+            return SDKInvalidResult::kMultiInit;
+        }
+        user_exit_flag.getAndSet(0);
+
+        g_config = new ClientConfig(config_file);
+
+        if (!g_config){
+            LOG_ERROR("dataproxy_sdk_cpp init error");
+            return SDKInvalidResult::kErrorInit;
+        }
+        bool res = g_config->parseConfig();
+        
+        if (!res){
+            // init error and not allow default init
+            if (g_use_default)
+            {
+                g_config->defaultInit();
+            }
+            else
+            {
+                LOG_ERROR("dataproxy_sdk_cpp init error");
+                return SDKInvalidResult::kErrorInit;
+            }         
+        }
+        
+        remove("./.cpplog");
+
+        getLogger().init(g_config->log_size_, g_config->log_num_, 
Logger::Level(g_config->log_level_), g_config->log_file_type_,
+                         g_config->log_enable_limit_, g_config->log_path_);
+
+        LOG_WARN("dataproxy_sdk_cpp start init, config path:%s, version:%s", 
config_file, constants::kTDBusCAPIVersion);
+        g_config->showClientConfig();
+
+        // get local ip
+        if (!Utils::getFirstIpAddr(g_config->ser_ip_))
+        {
+            LOG_WARN("not found the localHost in local OS, use user's 
ser_ip(%s) in config", g_config->ser_ip_.c_str());
+        }
+
+        if (g_config->enable_setaffinity_)
+        {
+            Utils::bindCPU(g_config->mask_cpu_affinity_);
+        }
+
+        signal(SIGPIPE, SIG_IGN);
+
+        g_pools = new TotalPools();
+        if (!g_pools)
+        {
+            LOG_ERROR("fail to init global buffer pools");
+            return SDKInvalidResult::kErrorInit;
+        }
+        LOG_INFO("buf pools init complete");
+
+        g_clusters = new GlobalCluster();
+        if (!g_clusters)
+        {
+            LOG_ERROR("fail to init global clusterlist");
+            return SDKInvalidResult::kErrorInit;
+        }
+        LOG_INFO("global clusterlist init complete");
+
+        g_queues = new GlobalQueues();
+        if (!g_queues)
+        {
+            LOG_ERROR("fail to init global packqueue");
+            return SDKInvalidResult::kErrorInit;
+        }
+        LOG_INFO("global packqueue init complete");
+
+        // init network threadpools
+        g_executors = new ExecutorThreadPool();
+        if (!g_executors)
+        {
+            LOG_ERROR("fail to init network threads");
+            return SDKInvalidResult::kErrorInit;
+        }
+
+        LOG_INFO("read cache proxylist for disaster tolerance");
+        g_clusters->readCacheBuslist();
+
+        if (!g_config->inlong_group_ids_.empty())
+        {
+            int32_t ret = g_clusters->initBuslistAndCreateConns();
+            // FIXME: improve, return ret to user?
+        }
+
+        // packqueue flush thread
+        g_queues->startCheckSubroutine();
+
+        // proxylist update thread
+        g_clusters->startUpdateSubroutine();
+
+        LOG_WARN("dataproxy_sdk_cpp init complete!");
+
+        return 0;
+    }
+
+    int32_t tc_api_init_ext(const char *config_file, int32_t use_def)
+    {
+        g_use_default = use_def;
+        return tc_api_init(config_file);
+    }
+
+    int32_t sendBaseMsg(const std::string msg,
+                        const std::string inlong_group_id,
+                        const std::string inlong_stream_id,
+                        const std::string client_ip,
+                        int64_t report_time,
+                        UserCallBack call_back)
+    {
+        // should init first, and close before
+        if (init_flag.get() == 0 || user_exit_flag.get() == 1)
+        {
+            LOG_ERROR("capi has been closed, init first and then send");
+            return SDKInvalidResult::kSendAfterClose;
+        }
+
+        // input check
+        if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty())
+        {
+            LOG_ERROR("invalid input, inlong_group_id:%s, inlong_stream_id:%s, 
msg:%s", inlong_group_id.c_str(), inlong_stream_id.c_str(), msg.c_str());
+            return SDKInvalidResult::kInvalidInput;
+        }
+
+        // msg len check
+        if (msg.size() > g_config->ext_pack_size_)
+        {
+            LOG_ERROR("msg len is too long, cur msg_len:%d, ext_pack_size:%d", 
msg.size(), g_config->ext_pack_size_);
+            return SDKInvalidResult::kMsgTooLong;
+        }
+
+        if(g_config->enable_groupId_isolation_){
+            
if(std::find(g_config->inlong_group_ids_.begin(),g_config->inlong_group_ids_.end(),inlong_group_id)==g_config->inlong_group_ids_.end()){
+                LOG_ERROR("inlong_group_id:%s is not specified in config file, 
check it", inlong_group_id.c_str());
+                return SDKInvalidResult::kInvalidGroupId;
+            }
+        }
+
+        g_clusters->addBuslist(inlong_group_id);
+
+        if (!g_pools->isPoolAvailable(inlong_group_id))
+        {
+            if (g_buf_full.get() == 0)
+                LOG_ERROR("buf pool is full, send later, datalen:%d", 
msg.size());
+            g_buf_full.increment();
+            if (g_buf_full.get() > 100)
+            {
+                g_buf_full.getAndSet(1);
+                LOG_ERROR("buf pool is full 100 times, send later, 
datalen:%d", msg.size());
+            }
+
+            return SDKInvalidResult::kBufferPoolFull;
+        }
+
+        //get packqueue
+        auto pack_queue = g_queues->getPackQueue(inlong_group_id, 
inlong_stream_id);
+        if (!pack_queue)
+        {
+            LOG_ERROR("fail to get pack queue, inlong_group_id:%s, 
inlong_stream_id:%s", inlong_group_id.c_str(), inlong_stream_id.c_str());
+            return SDKInvalidResult::kFailGetPackQueue;
+        }
+
+        return pack_queue->sendMsg(msg, inlong_group_id, inlong_stream_id, 
client_ip, report_time, call_back);
+    }
+
+    int32_t tc_api_send(const char *proxyiness_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back)
+    {
+        int64_t msg_time = Utils::getCurrentMsTime();
+        return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, "", msg_time, 
call_back);
+    }
+
+    int32_t tc_api_send_batch(const char *proxyiness_id, const char 
*inlong_stream_id, const char **msg_list, int32_t msg_cnt, UserCallBack 
call_back)
+    {
+        if (!msg_cnt)
+            return SDKInvalidResult::kInvalidInput;
+        int64_t msg_time = Utils::getCurrentMsTime();
+        int32_t ret = 0;
+        for (size_t i = 0; i < msg_cnt; i++)
+        {
+            ret = sendBaseMsg(msg_list[i], proxyiness_id, inlong_stream_id, 
"", msg_time, call_back);
+            if (ret)
+                return ret;
+        }
+        return 0;
+    }
+
+    int32_t tc_api_send_ext(const char *proxyiness_id, const char 
*inlong_stream_id, const char *msg, int32_t msg_len, const int64_t msg_time, 
UserCallBack call_back)
+    {
+        return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, "", msg_time, 
call_back);
+    }
+
+    int32_t tc_api_send_base(const char *proxyiness_id,
+                             const char *inlong_stream_id,
+                             const char *msg,
+                             int32_t msg_len,
+                             const int64_t report_time,
+                             const char *client_ip,
+                             UserCallBack call_back)
+    {
+        return sendBaseMsg(msg, proxyiness_id, inlong_stream_id, client_ip, 
report_time, call_back);
+    }
+
+    int32_t tc_api_close(int32_t max_waitms)
+    {
+        // only exit once
+        if (!init_flag.compareAndSwap(1, 0))
+        {
+            LOG_ERROR("dataproxy_sdk_cpp has been closed!");
+            return SDKInvalidResult::kMultiExits;
+        }
+
+        user_exit_flag.getAndSet(1);
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(max_waitms));
+
+        g_queues->printTotalAck(); // pring ack msg count of each 
groupid+streamid
+
+        delete g_queues;
+        g_queues=nullptr;
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+        delete g_executors;
+        g_executors=nullptr;
+        delete g_pools;
+        g_pools=nullptr;
+        delete g_clusters;
+        g_clusters=nullptr;
+        delete g_config;
+        g_config=nullptr;
+
+        // std::this_thread::sleep_for(std::chrono::seconds(5));
+
+        LOG_WARN("close dataproxy_sdk_cpp!");
+
+        return 0;
+    }
+} // namespace dataproxy_sdk
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
new file mode 100644
index 000000000..79d3a2a33
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/sdk_core.h
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_CORE_H_
+#define DATAPROXY_SDK_BASE_CORE_H_
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <asio.hpp>
+
+#include "atomic.h"
+#include "user_msg.h"
+
+// #include "buffer_pool.h"
+namespace dataproxy_sdk {
+class ClientConfig;
+class GlobalCluster;
+class GlobalQueues;
+class TotalPools;
+
+class ExecutorThread;
+using ExecutorThreadPtr = std::shared_ptr<ExecutorThread>;
+
+class ExecutorThreadPool;
+
+class Connection;
+using ConnectionPtr = std::shared_ptr<Connection>;
+
+class ProxyInfo;
+using ProxyInfoPtr = std::shared_ptr<ProxyInfo>;
+
+class SendBuffer;
+
+using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+using io_context_work = 
asio::executor_work_guard<asio::io_context::executor_type>;
+
+extern AtomicUInt          g_send_msgid;
+extern ClientConfig*       g_config;
+extern GlobalCluster*      g_clusters;
+extern GlobalQueues*       g_queues;
+extern TotalPools*         g_pools;
+extern ExecutorThreadPool* g_executors;
+extern AtomicInt           user_exit_flag;
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_CORE_H_
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h
new file mode 100644
index 000000000..49dbea42e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/base/user_msg.h
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATAPROXY_SDK_BASE_USER_MSG_H_
+#define DATAPROXY_SDK_BASE_USER_MSG_H_
+
+#include <memory>
+#include <stdint.h>
+#include <string>
+namespace dataproxy_sdk
+{
+// UserCallBack signature: inlong_group_id,inlong_stream_id,msg,msg_len, 
report_time, client_ip
+using UserCallBack = std::function<int32_t(const char*, const char*, const 
char*, int32_t, const int64_t, const char*)>;
+
+struct UserMsg
+{
+    std::string msg;
+    std::string client_ip;  
+    int64_t report_time;   
+    UserCallBack cb;
+
+    int64_t user_report_time;    
+    std::string user_client_ip;
+
+    std::string data_pack_format_attr;  //"__addcol1__reptime=" + 
Utils::getFormatTime(data_time_) + "&__addcol2__ip=" + client_ip
+    UserMsg(const std::string& mmsg,
+            const std::string& mclient_ip,
+            int64_t mreport_time,
+            UserCallBack mcb,
+            const std::string& attr,
+            const std::string& u_ip,
+            int64_t u_time)
+        : msg(mmsg)
+        , client_ip(mclient_ip)
+        , report_time(mreport_time)
+        , cb(mcb)
+        , data_pack_format_attr(attr)
+        , user_client_ip(u_ip)
+        , user_report_time(u_time)
+    {
+    }
+};
+using UserMsgPtr = std::shared_ptr<UserMsg>;
+
+}  // namespace dataproxy_sdk
+
+#endif  // DATAPROXY_SDK_BASE_USER_MSG_H_
\ No newline at end of file

Reply via email to