This is an automated email from the ASF dual-hosted git repository. yjhjstz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit efa99fdc3ffcf1d05439cfac61d6308be820c977 Author: Jianghua Yang <[email protected]> AuthorDate: Fri Jun 20 23:24:56 2025 +0000 Use event_base with libevent 2.0+ to avoid thread-unsafe event_init in gpfdist The legacy event_init() function is not thread-safe and can cause issues when gpfdist is run in multi-threaded environments. This patch updates gpfdist to use libevent 2.0+'s thread-safe APIs, specifically `event_base` along with `event_assign()` and `evtimer_assign()`. A new global `gcb.event_base` is introduced and used when compiled with libevent ≥ 2.0.1. This avoids the need for the deprecated and non-thread-safe `event_set()` / `evtimer_set()` APIs, and prepares gpfdist for better thread safety. --- src/bin/gpfdist/gpfdist.c | 59 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index ab3a5b159c4..ad7fb868313 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -233,6 +233,7 @@ static struct SSL_CTX *server_ctx;/* for SSL */ #endif int wdtimer; /* Kill gpfdist after k seconds of inactivity. 0 to disable. */ + struct event_base *event_base; /* for libevent 2.0+ */ } gcb; /* A session */ @@ -1600,7 +1601,7 @@ static void session_detach(request_t* r) } event_del(&session->ev); - evtimer_set(&session->ev, free_session_cb, session); + evtimer_assign(&session->ev, gcb.event_base, free_session_cb, session); session->tm.tv_sec = opt.w; session->tm.tv_usec = 0; (void)evtimer_add(&session->ev, &session->tm); @@ -1811,7 +1812,7 @@ static int session_attach(request_t* r) session->active_segids[r->segid] = 1; /* mark this segid as active */ session->maxsegs = r->totalsegs; session->requests = apr_hash_make(pool); - event_set(&session->ev, 0, 0, 0, 0); + event_assign(&session->ev, gcb.event_base, -1, 0, NULL, NULL); if (session->tid == 0 || session->path == 0 || session->key == 0) gfatal(r, "out of memory in session_attach"); @@ -2368,7 +2369,7 @@ static void do_accept(int fd, short event, void* arg) r->pool = pool; r->sock = sock; - event_set(&r->ev, 0, 0, 0, 0); + event_assign(&r->ev, gcb.event_base, -1, 0, NULL, NULL); /* use the block size specified by -m option */ r->outblock.data = palloc_safe(r, pool, opt.m, "out of memory when allocating buffer: %d bytes", opt.m); @@ -2421,7 +2422,7 @@ static int setup_write(request_t* r) if (r->sock < 0) gwarning(r, "internal error in setup_write - no socket to use"); event_del(&r->ev); - event_set(&r->ev, r->sock, EV_WRITE, do_write, r); + event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, do_write, r); return (event_add(&r->ev, 0)); } @@ -2445,7 +2446,7 @@ static int setup_read(request_t* r) gwarning(r, "internal error in setup_read - no socket to use"); event_del(&r->ev); - event_set(&r->ev, r->sock, EV_READ, do_read_request, r); + event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_read_request, r); if(opt.t == 0) { @@ -2552,18 +2553,32 @@ static void signal_register() { /* when SIGTERM raised invoke process_term_signal */ - signal_set(&gcb.signal_event,SIGTERM,process_term_signal,0); + evsignal_assign(&gcb.signal_event, gcb.event_base, SIGTERM, process_term_signal, 0); /* high priority so we accept as fast as possible */ if(event_priority_set(&gcb.signal_event, 0)) gwarning(NULL,"signal event priority set failed"); /* start watching this event */ - if(signal_add(&gcb.signal_event, 0)) + if(evsignal_add(&gcb.signal_event, 0)) gfatal(NULL,"cannot set up event on signal register"); } +/* + * gpfdist_cleanup + * + * Clean up all resources before exiting + */ +static void gpfdist_cleanup(void) +{ + /* Clean up event_base if initialized */ + if (gcb.event_base) { + event_base_free(gcb.event_base); + gcb.event_base = NULL; + } +} + static void clear_listen_sock(void) { SOCKET sock = -1; @@ -2616,9 +2631,8 @@ http_setup(void) hostaddr = opt.b; /* setup event priority */ - if (event_priority_init(10)) - gwarning(NULL, "event_priority_init failed"); - + if (event_base_priority_init(gcb.event_base, 10)) + gwarning(NULL, "event_base_priority_init failed"); /* Try each possible port from opt.p to opt.last_port */ for (;;) @@ -2811,8 +2825,8 @@ http_setup(void) for (i = 0; i < gcb.listen_sock_count; i++) { /* when this socket is ready, do accept */ - event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | EV_PERSIST, - do_accept, 0); + event_assign(&gcb.listen_events[i], gcb.event_base, gcb.listen_socks[i], + EV_READ | EV_PERSIST, do_accept, 0); /* only signal process function priority higher than socket handler */ if (event_priority_set(&gcb.listen_events[i], 1)) @@ -2838,6 +2852,9 @@ process_term_signal(int sig,short event,void* arg) { closesocket(gcb.listen_socks[i]); } + + /* Clean up resources before exiting */ + gpfdist_cleanup(); _exit(1); } @@ -3913,7 +3930,10 @@ int gpfdist_init(int argc, const char* const argv[]) putenv("EVENT_SHOW_METHOD=1"); putenv("EVENT_NOKQUEUE=1"); - event_init(); + /* libevent 2.0+ */ + gcb.event_base = event_base_new(); + if (!gcb.event_base) + gfatal(NULL, "event_base_new failed"); signal_register(); http_setup(); @@ -3991,16 +4011,19 @@ int gpfdist_init(int argc, const char* const argv[]) int gpfdist_run() { - return event_dispatch(); + return event_base_dispatch(gcb.event_base); } #ifndef WIN32 int main(int argc, const char* const argv[]) { + int ret; if (gpfdist_init(argc, argv) == -1) gfatal(NULL, "Initialization failed"); - return gpfdist_run(); + ret = gpfdist_run(); + gpfdist_cleanup(); + return ret; } @@ -4175,6 +4198,7 @@ int main(int argc, const char* const argv[]) if (gpfdist_init(argc, argv) == -1) gfatal(NULL, "Initialization failed"); main_ret = gpfdist_run(); + gpfdist_cleanup(); } @@ -4264,6 +4288,7 @@ void ServiceMain(int argc, char** argv) * actual service work */ gpfdist_run(); + gpfdist_cleanup(); } void ControlHandler(DWORD request) @@ -4566,7 +4591,7 @@ static void flush_ssl_buffer(int fd, short event, void* arg) static void setup_flush_ssl_buffer(request_t* r) { event_del(&r->ev); - event_set(&r->ev, r->sock, EV_WRITE, flush_ssl_buffer, r); + event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, flush_ssl_buffer, r); r->tm.tv_sec = 5; r->tm.tv_usec = 0; (void)event_add(&r->ev, &r->tm); @@ -4678,7 +4703,7 @@ static void request_cleanup(request_t *r) static void setup_do_close(request_t* r) { event_del(&r->ev); - event_set(&r->ev, r->sock, EV_READ, do_close, r); + event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_close, r); r->tm.tv_sec = 60; r->tm.tv_usec = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
