This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 47c710b28c [INLONG-10793][SDK] Added metric management for DataProxy
CPP SDK (#10815)
47c710b28c is described below
commit 47c710b28ce044f9a6e695273c55ea20b72d574e
Author: doleyzi <[email protected]>
AuthorDate: Mon Aug 19 20:10:18 2024 +0800
[INLONG-10793][SDK] Added metric management for DataProxy CPP SDK (#10815)
---
.../dataproxy-sdk-cpp/CMakeLists.txt | 3 +-
.../dataproxy-sdk-cpp/src/core/api_imp.cc | 4 +-
.../src/manager/metric_manager.cc | 64 +++++++++
.../dataproxy-sdk-cpp/src/manager/metric_manager.h | 93 +++++++++++++
.../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 1 +
.../dataproxy-sdk-cpp/src/metric/environment.h | 48 +++++++
.../dataproxy-sdk-cpp/src/metric/metric.h | 151 +++++++++++++++++++++
.../dataproxy-sdk-cpp/src/utils/CMakeLists.txt | 7 +-
.../dataproxy-sdk-cpp/src/utils/capi_constant.h | 1 +
9 files changed, 368 insertions(+), 4 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
index ccbab35336..2a3183fff3 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt
@@ -33,6 +33,7 @@ include_directories(src/manager)
include_directories(src/group)
include_directories(src/protocol)
include_directories(src/client)
+include_directories(src/metric)
link_directories(${PROJECT_SOURCE_DIR}/third_party/lib)
link_directories(${PROJECT_SOURCE_DIR}/third_party/lib64)
@@ -57,7 +58,7 @@ aux_source_directory(src/protocol PROTOCOL)
aux_source_directory(src/client CLIENT)
# static library
-add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER}
${GROUP} ${PROTOCOL} ${CLIENT})
+add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER}
${GROUP} ${PROTOCOL} ${CLIENT} ${METRIC})
set_target_properties(dataproxy_sdk PROPERTIES OUTPUT_NAME "dataproxy_sdk"
PREFIX "")
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
index c4b493b068..13ce7223b5 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc
@@ -25,6 +25,8 @@
#include <iostream>
#include <signal.h>
+#include "metric_manager.h"
+
namespace inlong {
int32_t ApiImp::InitApi(const char *config_file_path) {
if (!__sync_bool_compare_and_swap(&inited_, false, true)) {
@@ -104,7 +106,7 @@ int32_t ApiImp::DoInit() {
LOG_INFO("inlong dataproxy cpp sdk Init complete!");
ProxyManager::GetInstance()->Init();
- ProxyManager::GetInstance()->ReadLocalCache();
+ MetricManager::GetInstance()->Init();
for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++)
{
LOG_INFO("DoInit CheckConf inlong_group_id:"
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
new file mode 100644
index 0000000000..061abc0678
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "metric_manager.h"
+
+#include <rapidjson/document.h>
+#include <sys/prctl.h>
+#include <unistd.h>
+
+#include "../utils/logger.h"
+#include "../utils/utils.h"
+#include "../utils/capi_constant.h"
+
+namespace inlong {
+void MetricManager::Init() {
+ if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+ update_thread_ = std::thread(&MetricManager::Run, this);
+ }
+ InitEnvironment();
+}
+void MetricManager::InitEnvironment() {
+ environment_.setType("cpp");
+ environment_.setVersion(constants::kVersion);
+ environment_.setPid(getpid());
+ environment_.setIp(SdkConfig::getInstance()->local_ip_);
+}
+void MetricManager::Run() {
+ prctl(PR_SET_NAME, "metric-manager");
+ while (running_) {
+ LOG_INFO("Start report metric");
+ PrintMetric();
+
std::this_thread::sleep_for(std::chrono::minutes(constants::kMetricIntervalMinutes));
+ }
+}
+void MetricManager::PrintMetric() {
+ std::unordered_map<std::string, Metric> stat_map;
+ {
+ std::lock_guard<std::mutex> lck(mutex_);
+ stat_map.swap(stat_map_);
+ }
+
+ LOG_INFO("[MetricManager] Environment info: " << environment_.ToString());
+
+ for (auto it : stat_map) {
+ LOG_INFO("[MetricManager] Metric info: " << it.first << " " <<
it.second.ToString());
+ }
+}
+} // namespace inlong
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
new file mode 100644
index 0000000000..5dc013f2d5
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <queue>
+#include <thread>
+#include <unordered_map>
+
+#include "../config/sdk_conf.h"
+#include "../metric/environment.h"
+#include "../metric/metric.h"
+
+#ifndef INLONG_METRIC_MANAGER_H
+#define INLONG_METRIC_MANAGER_H
+namespace inlong {
+using MetricMap = std::unordered_map<std::string, Metric>;
+static const char kStatJoiner = ' ';
+class MetricManager {
+ private:
+ mutable std::mutex mutex_;
+ MetricMap stat_map_;
+ std::thread update_thread_;
+ volatile bool inited_ = false;
+ bool running_ = true;
+ Environment environment_;
+ std::string coreParma_;
+
+ MetricManager() {
+
+ }
+
+ public:
+ static MetricManager *GetInstance() {
+ static MetricManager instance;
+ return &instance;
+ }
+ void Init();
+ void InitEnvironment();
+ void PrintMetric();
+ void Run();
+ void UpdateMetric(const std::string &stat_key, Metric &stat) {
+ std::lock_guard<std::mutex> lck(mutex_);
+ stat_map_[stat_key].Update(stat);
+ }
+
+ void AddReceiveBufferFullCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ std::lock_guard<std::mutex> lck(mutex_);
+ std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+ stat_map_[stat_key].AddReceiveBufferFullCount(count);
+ }
+
+ void AddTooLongMsgCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ std::lock_guard<std::mutex> lck(mutex_);
+ std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+ stat_map_[stat_key].AddTooLongMsgCount(count);
+ }
+
+ void AddMetadataFailCount(const std::string &inlong_group_id, const
std::string &inlong_stream_id,uint64_t count) {
+ std::lock_guard<std::mutex> lck(mutex_);
+ std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
+ stat_map_[stat_key].AddMetadataFailCount(count);
+ }
+
+ void Reset();
+
+ std::string BuildStatKey(const std::string &inlong_group_id, const
std::string &inlong_stream_id) {
+ return inlong_group_id + kStatJoiner + inlong_stream_id;
+ }
+
+ ~MetricManager() {
+ running_ = false;
+ if (update_thread_.joinable()) {
+ update_thread_.join();
+ }
+ }
+};
+} // namespace inlong
+#endif // INLONG_METRIC_MANAGER_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
index e4aef239e4..09014d728d 100644
---
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
+++
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc
@@ -44,6 +44,7 @@ void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
+ ReadLocalCache();
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
new file mode 100644
index 0000000000..e3d4a9bcc8
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_ENVIRONMENT_H
+#define INLONG_ENVIRONMENT_H
+
+#include <string>
+#include <sstream>
+namespace inlong {
+class Environment {
+ public:
+ std::string type_;
+ std::string version_;
+ std::string ip_;
+ uint64_t pid_;
+ const std::string &getType() const { return type_; }
+ void setType(const std::string &type) { type_ = type; }
+ std::string getVersion() { return version_; }
+ void setVersion(const std::string &version) { version_ = version; }
+ const std::string &getIp() const { return ip_; }
+ void setIp(const std::string &ip) { ip_ = ip; }
+ uint64_t getPid() const { return pid_; }
+ void setPid(uint64_t pid) { pid_ = pid; }
+
+ std::string ToString() const {
+ std::stringstream metric;
+ metric << "local ip[" << ip_ << "] ";
+ metric << "version [" << version_ << "] ";
+ metric << "pid [" << pid_ << "] ";
+ return metric.str();
+ }
+};
+} // namespace inlong
+#endif // INLONG_ENVIRONMENT_H
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
new file mode 100644
index 0000000000..87bd991b28
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INLONG_METRIC_H
+#define INLONG_METRIC_H
+
+#include <cstdint>
+#include <sstream>
+namespace inlong {
+class Metric {
+ private:
+ uint64_t send_success_pack_num_;
+ uint64_t send_success_msg_num_;
+ uint64_t send_failed_pack_num_;
+ uint64_t send_failed_msg_num_;
+
+ uint64_t time_cost_;
+ uint64_t time_cost_0t32_;
+ uint64_t time_cost_32t128_;
+ uint64_t time_cost_128t1024_;
+ uint64_t time_cost_1024t65536_;
+
+ uint64_t receive_buffer_full_count_;
+ uint64_t too_long_msg_count_;
+ uint64_t metadata_fail_count_;
+
+ public:
+ Metric()
+ : send_success_pack_num_(0),
+ send_success_msg_num_(0),
+ send_failed_pack_num_(0),
+ send_failed_msg_num_(0),
+ time_cost_(0),
+ time_cost_0t32_(0),
+ time_cost_32t128_(0),
+ time_cost_128t1024_(0),
+ time_cost_1024t65536_(0),
+ receive_buffer_full_count_(0),
+ too_long_msg_count_(0),
+ metadata_fail_count_(0) {}
+
+ void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; }
+ void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; }
+ void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; }
+ void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; }
+ void AddReceiveBufferFullCount(uint64_t receive_buffer_full_count) {
+ receive_buffer_full_count_ += receive_buffer_full_count;
+ }
+ void AddTooLongMsgCount(uint64_t too_long_msg_count) { too_long_msg_count_
+= too_long_msg_count; }
+ void AddMetadataFailCount(uint64_t metadata_fail_count) {
metadata_fail_count_ += metadata_fail_count; }
+
+ uint64_t GetSendSuccessPackNum() { return send_success_pack_num_; }
+ uint64_t GetSendSuccessMsgNum() { return send_success_msg_num_; }
+ uint64_t GetSendFailedPackNum() { return send_failed_pack_num_; }
+ uint64_t GetSendFailedMsgNum() { return send_failed_msg_num_; }
+ uint64_t GetTimeCost() { return time_cost_; }
+ uint64_t GetTimeCost0T32() const { return time_cost_0t32_; }
+ uint64_t GetTimeCost32T128() const { return time_cost_32t128_; }
+ uint64_t GetTimeCost128T1024() const { return time_cost_128t1024_; }
+ uint64_t GetTimeCost1024T65536() const { return time_cost_1024t65536_; }
+ uint64_t GetReceiveBufferFullCount() const { return
receive_buffer_full_count_; }
+ uint64_t GetTooLongMsgCount() const { return too_long_msg_count_; }
+ uint64_t GetMetadataFailCount() const { return metadata_fail_count_; }
+
+ void AddTimeCost(uint64_t time_cost) {
+ time_cost_ += time_cost;
+ if (time_cost < 32) {
+ time_cost_0t32_++;
+ return;
+ } else if (time_cost < 128) {
+ time_cost_32t128_++;
+ return;
+ } else if (time_cost < 1024) {
+ time_cost_128t1024_++;
+ return;
+ } else {
+ time_cost_1024t65536_++;
+ }
+ }
+
+ void ResetStat() {
+ send_success_pack_num_ = 0;
+ send_success_msg_num_ = 0;
+ send_failed_pack_num_ = 0;
+ send_failed_msg_num_ = 0;
+ time_cost_ = 0;
+ time_cost_0t32_ = 0;
+ time_cost_32t128_ = 0;
+ time_cost_128t1024_ = 0;
+ time_cost_1024t65536_ = 0;
+ receive_buffer_full_count_ = 0;
+ too_long_msg_count_ = 0;
+ metadata_fail_count_ = 0;
+ }
+
+ void Update(Metric stat) {
+ send_success_pack_num_ += stat.send_success_pack_num_;
+ send_success_msg_num_ += stat.send_success_msg_num_;
+ send_failed_pack_num_ += stat.send_failed_pack_num_;
+ send_failed_msg_num_ += stat.send_failed_msg_num_;
+ time_cost_ += stat.time_cost_;
+ time_cost_0t32_ += stat.time_cost_0t32_;
+ time_cost_32t128_ += stat.time_cost_32t128_;
+ time_cost_128t1024_ += stat.time_cost_128t1024_;
+ time_cost_1024t65536_ += stat.time_cost_1024t65536_;
+ }
+
+ uint64_t getTransTime() const {
+ uint64_t pack_num = send_success_pack_num_ + send_failed_pack_num_ + 1;
+ return time_cost_ / pack_num;
+ }
+
+ std::string GetSendMetricInfo() const {
+ std::stringstream metric;
+ metric << "success-pack[" << send_success_pack_num_ << "] ";
+ metric << "msg[" << send_success_msg_num_ << "] ";
+ metric << "failed-pack[" << send_failed_pack_num_ << "] ";
+ metric << "msg[" << send_failed_msg_num_ << "] ";
+ metric << "trans[" << getTransTime() << "] ";
+ return metric.str();
+ }
+ std::string ToString() const {
+ std::stringstream metric;
+ metric << "success-pack[" << send_success_pack_num_ << "] ";
+ metric << "msg[" << send_success_msg_num_ << "] ";
+ metric << "failed-pack[" << send_failed_pack_num_ << "] ";
+ metric << "msg[" << send_failed_msg_num_ << "] ";
+ metric << "trans[" << getTransTime() << "] ";
+ metric << "buffer full[" << receive_buffer_full_count_ << "] ";
+ metric << "too long msg[" << too_long_msg_count_ << "] ";
+ metric << "metadata fail[" << metadata_fail_count_ << "] ";
+ return metric.str();
+ }
+};
+} // namespace inlong
+
+#endif // INLONG_METRIC_H
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
index 6ecf82464b..a95af7462f 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt
@@ -19,6 +19,9 @@
cmake_minimum_required(VERSION 3.1)
-aux_source_directory(. UTILS_SRCS)
-
+file(GLOB UTILS_SRCS
+ "*.cc"
+ "*.h"
+ )
add_library(utils STATIC ${UTILS_SRCS})
+
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
index 399bd1b348..dbac24f6a6 100644
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h
@@ -108,6 +108,7 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2,
2, 2, 2, 2,
static const char kCacheFile[] = ".proxy_list.ini";
static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
const int MAX_RETRY = 10;
+static const int kMetricIntervalMinutes = 1;
} // namespace constants
} // namespace inlong