This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit c09ea5d21a11604993fc3a6bb2ece50b1a007698 Author: Maxim Smyatkin <[email protected]> AuthorDate: Mon Oct 2 12:54:32 2023 +0300 [yagp_hooks_collector] Replace GRPC transport with protobuf-over-UDS Remove GRPC dependency. Serialize metrics as protobuf messages and deliver them over a Unix domain socket. Replace server-side message queue with incremental per-query message building. Add clang-format configuration. Use deprecated protobuf API for bionic compatibility. --- .clang-format | 2 + protos/yagpcc_set_service.proto | 23 ++----- src/EventSender.cpp | 115 +++++++++++++++++++++------------- src/EventSender.h | 10 ++- src/GrpcConnector.cpp | 133 ---------------------------------------- src/GrpcConnector.h | 15 ----- src/UDSConnector.cpp | 83 +++++++++++++++++++++++++ src/UDSConnector.h | 13 ++++ 8 files changed, 183 insertions(+), 211 deletions(-) diff --git a/.clang-format b/.clang-format new file mode 100644 index 00000000000..99130575c9a --- /dev/null +++ b/.clang-format @@ -0,0 +1,2 @@ +BasedOnStyle: LLVM +SortIncludes: false diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto index 93c2f5a01d1..e8fc7aaa99d 100644 --- a/protos/yagpcc_set_service.proto +++ b/protos/yagpcc_set_service.proto @@ -9,23 +9,6 @@ package yagpcc; option java_outer_classname = "SegmentYAGPCCAS"; option go_package = "a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum"; -service SetQueryInfo { - rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {} - - rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {} -} - -message MetricResponse { - MetricResponseStatusCode error_code = 1; - string error_text = 2; -} - -enum MetricResponseStatusCode { - METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0; - METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1; - METRIC_RESPONSE_STATUS_CODE_ERROR = 2; -} - message SetQueryReq { QueryStatus query_status = 1; google.protobuf.Timestamp datetime = 2; @@ -34,6 +17,9 @@ message SetQueryReq { QueryInfo query_info = 5; GPMetrics query_metrics = 6; repeated MetricPlan plan_tree = 7; + google.protobuf.Timestamp submit_time = 8; + google.protobuf.Timestamp start_time = 9; + google.protobuf.Timestamp end_time = 10; } message SetPlanNodeReq { @@ -43,4 +29,7 @@ message SetPlanNodeReq { SegmentKey segment_key = 4; GPMetrics node_metrics = 5; MetricPlan plan_node = 6; + google.protobuf.Timestamp submit_time = 7; + google.protobuf.Timestamp start_time = 8; + google.protobuf.Timestamp end_time = 9; } diff --git a/src/EventSender.cpp b/src/EventSender.cpp index 9146078fd0e..834553a6187 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,6 +1,6 @@ #include "Config.h" -#include "GrpcConnector.h" #include "ProcStats.h" +#include "UDSConnector.h" #include <chrono> #include <ctime> @@ -15,7 +15,6 @@ extern "C" { #include "commands/resgroupcmds.h" #include "executor/executor.h" #include "utils/elog.h" -#include "utils/metrics_utils.h" #include "utils/workfile_mgr.h" #include "cdb/cdbdisp.h" @@ -102,33 +101,46 @@ void set_plan_text(std::string *plan_text, QueryDesc *query_desc) { *plan_text = std::string(es.str->data, es.str->len); } -void set_query_plan(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { - qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER - ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER - : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER); - set_plan_text(qi->mutable_plan_text(), query_desc); - StringInfo norm_plan = gen_normplan(qi->plan_text().c_str()); - *qi->mutable_template_plan_text() = std::string(norm_plan->data); - qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len)); +void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { + if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) { + auto qi = req->mutable_query_info(); + qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER + ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER + : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER); + set_plan_text(qi->mutable_plan_text(), query_desc); + StringInfo norm_plan = gen_normplan(qi->plan_text().c_str()); + *qi->mutable_template_plan_text() = std::string(norm_plan->data); + qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len)); + // TODO: For now assume queryid equal to planid, which is wrong. The + // reason for doing so this bug + // https://github.com/greenplum-db/gpdb/pull/15385 (ORCA loses + // pg_stat_statements` queryid during planning phase). Need to fix it + // upstream, cherry-pick and bump gp + // qi->set_query_id(query_desc->plannedstmt->queryId); + qi->set_query_id(qi->plan_id()); + } } -void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { - *qi->mutable_query_text() = query_desc->sourceText; - char *norm_query = gen_normquery(query_desc->sourceText); - *qi->mutable_template_query_text() = std::string(norm_query); +void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { + if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) { + auto qi = req->mutable_query_info(); + *qi->mutable_query_text() = query_desc->sourceText; + char *norm_query = gen_normquery(query_desc->sourceText); + *qi->mutable_template_query_text() = std::string(norm_query); + } } -void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc, - bool with_text, bool with_plan) { +void clear_big_fields(yagpcc::SetQueryReq *req) { + if (Gp_session_role == GP_ROLE_DISPATCH) { + auto qi = req->mutable_query_info(); + qi->clear_plan_text(); + qi->clear_query_text(); + } +} + +void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { if (Gp_session_role == GP_ROLE_DISPATCH) { auto qi = req->mutable_query_info(); - if (query_desc->sourceText && with_text) { - set_query_text(qi, query_desc); - } - if (query_desc->plannedstmt && with_plan) { - set_query_plan(qi, query_desc); - qi->set_query_id(query_desc->plannedstmt->queryId); - } qi->set_allocated_username(get_user_name()); qi->set_allocated_databasename(get_db_name()); qi->set_allocated_rsgname(get_rg_name()); @@ -245,6 +257,10 @@ void EventSender::executor_before_start(QueryDesc *query_desc, if (!need_collect()) { return; } + if (query_msg->has_query_key()) { + connector->report_query(*query_msg, "previous query"); + query_msg->Clear(); + } query_start_time = std::chrono::high_resolution_clock::now(); WorkfileResetBackendStats(); if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) { @@ -268,10 +284,12 @@ void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { } if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && need_collect()) { - auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START); - set_query_info(&req, query_desc, false, true); - connector->set_metric_query(req, "started"); + query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); + *query_msg->mutable_start_time() = current_ts(); + set_query_plan(query_msg, query_desc); + if (connector->report_query(*query_msg, "started")) { + clear_big_fields(query_msg); + } } } @@ -284,21 +302,21 @@ void EventSender::executor_end(QueryDesc *query_desc) { return; } /* TODO: when querying via CURSOR this call freezes. Need to investigate. - To reproduce - uncomment it and run installchecks. It will freeze around join test. - Needs investigation - + To reproduce - uncomment it and run installchecks. It will freeze around + join test. Needs investigation + if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() && Config::enable_cdbstats() && query_desc->estate->dispatcherState && query_desc->estate->dispatcherState->primaryResults) { cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState, DISPATCH_WAIT_NONE); }*/ - auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_END); - // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to - // gather it here. It only makes sense when doing regular stat checks. - set_gp_metrics(req.mutable_query_metrics(), query_desc); - connector->set_metric_query(req, "ended"); + query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END); + *query_msg->mutable_end_time() = current_ts(); + set_gp_metrics(query_msg->mutable_query_metrics(), query_desc); + if (connector->report_query(*query_msg, "ended")) { + query_msg->Clear(); + } } void EventSender::collect_query_submit(QueryDesc *query_desc) { @@ -306,10 +324,14 @@ void EventSender::collect_query_submit(QueryDesc *query_desc) { return; } if (need_collect()) { - auto req = + *query_msg = create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT); - set_query_info(&req, query_desc, true, false); - connector->set_metric_query(req, "submit"); + *query_msg->mutable_submit_time() = current_ts(); + set_query_info(query_msg, query_desc); + set_query_text(query_msg, query_desc); + if (connector->report_query(*query_msg, "submit")) { + clear_big_fields(query_msg); + } } } @@ -319,20 +341,25 @@ void EventSender::collect_query_done(QueryDesc *query_desc, return; } if (need_collect()) { - auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE); - connector->set_metric_query(req, status); + query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); + if (connector->report_query(*query_msg, status)) { + clear_big_fields(query_msg); + } } } EventSender::EventSender() { if (Config::enable_collector() && !Config::filter_user(get_user_name())) { + query_msg = new yagpcc::SetQueryReq(); try { - connector = new GrpcConnector(); + connector = new UDSConnector(); } catch (const std::exception &e) { ereport(INFO, (errmsg("Unable to start query tracing %s", e.what()))); } } } -EventSender::~EventSender() { delete connector; } \ No newline at end of file +EventSender::~EventSender() { + delete query_msg; + delete connector; +} \ No newline at end of file diff --git a/src/EventSender.h b/src/EventSender.h index 2af8b7ffa03..161bf6ce037 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -1,9 +1,14 @@ #pragma once #include <memory> +#include <queue> #include <string> -class GrpcConnector; +extern "C" { +#include "utils/metrics_utils.h" +} + +class UDSConnector; struct QueryDesc; namespace yagpcc { class SetQueryReq; @@ -23,6 +28,7 @@ public: private: void collect_query_submit(QueryDesc *query_desc); void collect_query_done(QueryDesc *query_desc, const std::string &status); - GrpcConnector *connector = nullptr; + UDSConnector *connector = nullptr; int nesting_level = 0; + yagpcc::SetQueryReq *query_msg; }; \ No newline at end of file diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp deleted file mode 100644 index 73c1944fa04..00000000000 --- a/src/GrpcConnector.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include "GrpcConnector.h" -#include "Config.h" -#include "yagpcc_set_service.grpc.pb.h" - -#include <atomic> -#include <condition_variable> -#include <grpc++/channel.h> -#include <grpc++/grpc++.h> -#include <mutex> -#include <pthread.h> -#include <signal.h> -#include <string> -#include <thread> - -extern "C" { -#include "postgres.h" -#include "cdb/cdbvars.h" -} - -/* - * Set up the thread signal mask, we don't want to run our signal handlers - * in downloading and uploading threads. - */ -static void MaskThreadSignals() { - sigset_t sigs; - - if (pthread_equal(main_tid, pthread_self())) { - ereport(ERROR, (errmsg("thread_mask is called from main thread!"))); - return; - } - - sigemptyset(&sigs); - - /* make our thread to ignore these signals (which should allow that they be - * delivered to the main thread) */ - sigaddset(&sigs, SIGHUP); - sigaddset(&sigs, SIGINT); - sigaddset(&sigs, SIGTERM); - sigaddset(&sigs, SIGALRM); - sigaddset(&sigs, SIGUSR1); - sigaddset(&sigs, SIGUSR2); - - pthread_sigmask(SIG_BLOCK, &sigs, NULL); -} - -class GrpcConnector::Impl { -public: - Impl() : SOCKET_FILE("unix://" + Config::uds_path()) { - GOOGLE_PROTOBUF_VERIFY_VERSION; - channel = - grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials()); - stub = yagpcc::SetQueryInfo::NewStub(channel); - connected = true; - reconnected = false; - done = false; - reconnect_thread = std::thread(&Impl::reconnect, this); - } - - ~Impl() { - done = true; - cv.notify_one(); - reconnect_thread.join(); - } - - yagpcc::MetricResponse set_metric_query(const yagpcc::SetQueryReq &req, - const std::string &event) { - yagpcc::MetricResponse response; - if (!connected) { - response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); - response.set_error_text( - "Not tracing this query because grpc connection has been lost"); - return response; - } else if (reconnected) { - reconnected = false; - ereport(LOG, (errmsg("GRPC connection is restored"))); - } - grpc::ClientContext context; - int timeout = Gp_role == GP_ROLE_DISPATCH ? 500 : 250; - auto deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(timeout); - context.set_deadline(deadline); - grpc::Status status = (stub->SetMetricQuery)(&context, req, &response); - if (!status.ok()) { - response.set_error_text("GRPC error: " + status.error_message() + "; " + - status.error_details()); - response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); - ereport(LOG, (errmsg("Query {%d-%d-%d} %s tracing failed with error %s", - req.query_key().tmid(), req.query_key().ssid(), - req.query_key().ccnt(), event.c_str(), - response.error_text().c_str()))); - connected = false; - reconnected = false; - cv.notify_one(); - } - - return response; - } - -private: - const std::string SOCKET_FILE; - std::unique_ptr<yagpcc::SetQueryInfo::Stub> stub; - std::shared_ptr<grpc::Channel> channel; - std::atomic_bool connected, reconnected, done; - std::thread reconnect_thread; - std::condition_variable cv; - std::mutex mtx; - - void reconnect() { - MaskThreadSignals(); - while (!done) { - { - std::unique_lock<std::mutex> lock(mtx); - cv.wait(lock); - } - while (!connected && !done) { - auto deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(100); - connected = channel->WaitForConnected(deadline); - reconnected = connected.load(); - } - } - } -}; - -GrpcConnector::GrpcConnector() { impl = new Impl(); } - -GrpcConnector::~GrpcConnector() { delete impl; } - -yagpcc::MetricResponse -GrpcConnector::set_metric_query(const yagpcc::SetQueryReq &req, - const std::string &event) { - return impl->set_metric_query(req, event); -} \ No newline at end of file diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h deleted file mode 100644 index 6571c626dfd..00000000000 --- a/src/GrpcConnector.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include "protos/yagpcc_set_service.pb.h" - -class GrpcConnector { -public: - GrpcConnector(); - ~GrpcConnector(); - yagpcc::MetricResponse set_metric_query(const yagpcc::SetQueryReq &req, - const std::string &event); - -private: - class Impl; - Impl *impl; -}; \ No newline at end of file diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp new file mode 100644 index 00000000000..339a5d4f374 --- /dev/null +++ b/src/UDSConnector.cpp @@ -0,0 +1,83 @@ +#include "UDSConnector.h" +#include "Config.h" + +#include <string> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/types.h> +#include <sys/fcntl.h> +#include <chrono> +#include <thread> + +extern "C" { +#include "postgres.h" +#include "cdb/cdbvars.h" +} + +UDSConnector::UDSConnector() : uds_path("unix://" + Config::uds_path()) { + GOOGLE_PROTOBUF_VERIFY_VERSION; +} + +static void inline log_tracing_failure(const yagpcc::SetQueryReq &req, + const std::string &event) { + ereport(LOG, + (errmsg("Query {%d-%d-%d} %s tracing failed with error %s", + req.query_key().tmid(), req.query_key().ssid(), + req.query_key().ccnt(), event.c_str(), strerror(errno)))); +} + +bool UDSConnector::report_query(const yagpcc::SetQueryReq &req, + const std::string &event) { + sockaddr_un address; + address.sun_family = AF_UNIX; + strcpy(address.sun_path, uds_path.c_str()); + bool success = true; + auto sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd != -1) { + if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != -1) { + if (connect(sockfd, (sockaddr *)&address, sizeof(address)) != -1) { + auto data_size = req.ByteSize(); + auto total_size = data_size + sizeof(uint32_t); + uint8_t *buf = (uint8_t *)palloc(total_size); + uint32_t *size_payload = (uint32_t *)buf; + *size_payload = data_size; + req.SerializeWithCachedSizesToArray(buf + sizeof(uint32_t)); + int64_t sent = 0, sent_total = 0; + do { + sent = send(sockfd, buf + sent_total, total_size - sent_total, + MSG_DONTWAIT); + sent_total += sent; + } while ( + sent > 0 && size_t(sent_total) != total_size && + // the line below is a small throttling hack: + // if a message does not fit a single packet, we take a nap + // before sending the next one. + // Otherwise, MSG_DONTWAIT send might overflow the UDS + (std::this_thread::sleep_for(std::chrono::milliseconds(1)), true)); + if (sent < 0) { + log_tracing_failure(req, event); + success = false; + } + pfree(buf); + } else { + // log the error and go on + log_tracing_failure(req, event); + success = false; + } + } else { + // That's a very important error that should never happen, so make it + // visible to an end-user and admins. + ereport(WARNING, + (errmsg("Unable to create non-blocking socket connection %s", + strerror(errno)))); + success = false; + } + close(sockfd); + } else { + // log the error and go on + log_tracing_failure(req, event); + success = false; + } + return success; +} \ No newline at end of file diff --git a/src/UDSConnector.h b/src/UDSConnector.h new file mode 100644 index 00000000000..574653023e6 --- /dev/null +++ b/src/UDSConnector.h @@ -0,0 +1,13 @@ +#pragma once + +#include "protos/yagpcc_set_service.pb.h" +#include <queue> + +class UDSConnector { +public: + UDSConnector(); + bool report_query(const yagpcc::SetQueryReq &req, const std::string &event); + +private: + const std::string uds_path; +}; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
