This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 1c1bf580b06764892cb30df704f8c0c4afecb19b Author: Maxim Smyatkin <[email protected]> AuthorDate: Tue Jun 13 16:51:40 2023 +0300 [yagp_hooks_collector] Fix EventSender and GrpcConnector in forked processes Delay initialization of static singletons and GRPC connections to actual query handling time rather than _PG_init, since both are incompatible with fork(). --- debian/control | 4 ++-- src/EventSender.cpp | 10 ++-------- src/EventSender.h | 6 ++---- src/GrpcConnector.cpp | 33 ++++++++++++++++++++++----------- src/hook_wrappers.cpp | 33 +++++++++++++++++++++++---------- 5 files changed, 51 insertions(+), 35 deletions(-) diff --git a/debian/control b/debian/control index c740a8590ca..07176e94be5 100644 --- a/debian/control +++ b/debian/control @@ -2,10 +2,10 @@ Source: greenplum-6-yagpcc-hooks Section: misc Priority: optional Maintainer: Maxim Smyatkin <[email protected]> -Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), protobuf-compiler, protobuf-compiler-grpc, libgrpc++1, libgrpc++-dev +Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), ya-grpc (=1.46-57-50820-02384e3918-yandex) Standards-Version: 3.9.8 Package: greenplum-6-yagpcc-hooks Architecture: any -Depends: ${misc:Depends}, ${shlibs:Depends}, greenplum-db-6 (>=6.19.3) +Depends: ${misc:Depends}, ${shlibs:Depends}, greenplum-db-6 (>=6.19.3), ya-grpc (=1.46-57-50820-02384e3918-yandex) Description: Greenplum extension to send query execution metrics to yandex command center agent diff --git a/src/EventSender.cpp b/src/EventSender.cpp index b1f85cf9f1e..ec966e8686c 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -329,12 +329,6 @@ void EventSender::send_query_info(yagpcc::SetQueryReq *req, } } -EventSender *EventSender::instance() { - static EventSender sender; - return &sender; -} +EventSender::EventSender() { connector = std::make_unique<GrpcConnector>(); } -EventSender::EventSender() { - Config::init(); - connector = std::make_unique<GrpcConnector>(); -} \ No newline at end of file +EventSender::~EventSender() { connector.release(); } \ No newline at end of file diff --git a/src/EventSender.h b/src/EventSender.h index 9e2ef992f81..92e6937a690 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -17,15 +17,13 @@ public: void query_metrics_collect(QueryMetricsStatus status, void *arg); void incr_depth() { nesting_level++; } void decr_depth() { nesting_level--; } - static EventSender *instance(); + EventSender(); + ~EventSender(); private: void collect_query_submit(QueryDesc *query_desc); void collect_query_done(QueryDesc *query_desc, const std::string &status); - - EventSender(); void send_query_info(yagpcc::SetQueryReq *req, const std::string &event); std::unique_ptr<GrpcConnector> connector; - int nesting_level = 0; }; \ No newline at end of file diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp index 276c9ceb8a8..966bfb4a780 100644 --- a/src/GrpcConnector.cpp +++ b/src/GrpcConnector.cpp @@ -10,14 +10,17 @@ #include <string> #include <thread> -extern "C" { +extern "C" +{ #include "postgres.h" #include "cdb/cdbvars.h" } -class GrpcConnector::Impl { +class GrpcConnector::Impl +{ public: - Impl() : SOCKET_FILE("unix://" + Config::uds_path()) { + Impl() : SOCKET_FILE("unix://" + Config::uds_path()) + { GOOGLE_PROTOBUF_VERIFY_VERSION; channel = grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials()); @@ -27,15 +30,18 @@ public: reconnect_thread = std::thread(&Impl::reconnect, this); } - ~Impl() { + ~Impl() + { done = true; cv.notify_one(); reconnect_thread.join(); } - yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) { + yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) + { yagpcc::MetricResponse response; - if (!connected) { + if (!connected) + { response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); response.set_error_text( "Not tracing this query connection to agent has been lost"); @@ -47,7 +53,8 @@ public: std::chrono::system_clock::now() + std::chrono::milliseconds(timeout); context.set_deadline(deadline); grpc::Status status = (stub->SetMetricQuery)(&context, req, &response); - if (!status.ok()) { + if (!status.ok()) + { response.set_error_text("Connection lost: " + status.error_message() + "; " + status.error_details()); response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); @@ -68,13 +75,16 @@ private: std::mutex mtx; bool done; - void reconnect() { - while (!done) { + void reconnect() + { + while (!done) + { { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock); } - while (!connected && !done) { + while (!connected && !done) + { auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(100); connected = channel->WaitForConnected(deadline); @@ -88,6 +98,7 @@ GrpcConnector::GrpcConnector() { impl = new Impl(); } GrpcConnector::~GrpcConnector() { delete impl; } yagpcc::MetricResponse -GrpcConnector::set_metric_query(yagpcc::SetQueryReq req) { +GrpcConnector::set_metric_query(yagpcc::SetQueryReq req) +{ return impl->set_metric_query(req); } \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index a904dc9bafd..66ba6547ce2 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -28,7 +28,17 @@ static void ya_ExecutorFinish_hook(QueryDesc *query_desc); static void ya_ExecutorEnd_hook(QueryDesc *query_desc); static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg); +static EventSender *sender = nullptr; + +static inline EventSender *get_sender() { + if (!sender) { + sender = new EventSender(); + } + return sender; +} + void hooks_init() { + Config::init(); previous_ExecutorStart_hook = ExecutorStart_hook; ExecutorStart_hook = ya_ExecutorStart_hook; previous_ExecutorRun_hook = ExecutorRun_hook; @@ -49,11 +59,14 @@ void hooks_deinit() { ExecutorEnd_hook = previous_ExecutorEnd_hook; query_info_collect_hook = previous_query_info_collect_hook; stat_statements_parser_deinit(); + if (sender) { + delete sender; + } } void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { PG_TRY(); - { EventSender::instance()->executor_before_start(query_desc, eflags); } + { get_sender()->executor_before_start(query_desc, eflags); } PG_CATCH(); { ereport(WARNING, @@ -66,7 +79,7 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { standard_ExecutorStart(query_desc, eflags); } PG_TRY(); - { EventSender::instance()->executor_after_start(query_desc, eflags); } + { get_sender()->executor_after_start(query_desc, eflags); } PG_CATCH(); { ereport(WARNING, @@ -77,36 +90,36 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, long count) { - EventSender::instance()->incr_depth(); + get_sender()->incr_depth(); PG_TRY(); { if (previous_ExecutorRun_hook) previous_ExecutorRun_hook(query_desc, direction, count); else standard_ExecutorRun(query_desc, direction, count); - EventSender::instance()->decr_depth(); + get_sender()->decr_depth(); } PG_CATCH(); { - EventSender::instance()->decr_depth(); + get_sender()->decr_depth(); PG_RE_THROW(); } PG_END_TRY(); } void ya_ExecutorFinish_hook(QueryDesc *query_desc) { - EventSender::instance()->incr_depth(); + get_sender()->incr_depth(); PG_TRY(); { if (previous_ExecutorFinish_hook) previous_ExecutorFinish_hook(query_desc); else standard_ExecutorFinish(query_desc); - EventSender::instance()->decr_depth(); + get_sender()->decr_depth(); } PG_CATCH(); { - EventSender::instance()->decr_depth(); + get_sender()->decr_depth(); PG_RE_THROW(); } PG_END_TRY(); @@ -114,7 +127,7 @@ void ya_ExecutorFinish_hook(QueryDesc *query_desc) { void ya_ExecutorEnd_hook(QueryDesc *query_desc) { PG_TRY(); - { EventSender::instance()->executor_end(query_desc); } + { get_sender()->executor_end(query_desc); } PG_CATCH(); { ereport(WARNING, (errmsg("EventSender failed in ya_ExecutorEnd_hook"))); } PG_END_TRY(); @@ -127,7 +140,7 @@ void ya_ExecutorEnd_hook(QueryDesc *query_desc) { void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { PG_TRY(); - { EventSender::instance()->query_metrics_collect(status, arg); } + { get_sender()->query_metrics_collect(status, arg); } PG_CATCH(); { ereport(WARNING, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
