Switch CLD from hand-rolled server poll code, to libevent.  Follows
similar techniques and rationale as chunkd commit
c1aed7464f237e5a6309351bf003162c77d69e27.  This reverts ancient commit
90b3b5edcf5aa00577f4395fdbb490ed7e9be824.

Signed-off-by: Jeff Garzik <jgar...@redhat.com>
---
 cld/Makefile.am |    3 -
 cld/cld.h       |   22 +++----
 cld/server.c    |  161 ++++++++++++++++++++------------------------------------
 cld/session.c   |   69 ++++++++++++++++--------
 4 files changed, 118 insertions(+), 137 deletions(-)

diff --git a/cld/Makefile.am b/cld/Makefile.am
index 9a13ce0..30eea0b 100644
--- a/cld/Makefile.am
+++ b/cld/Makefile.am
@@ -12,7 +12,8 @@ cld_SOURCES   = cldb.h cld.h \
                  cldb.c msg.c server.c session.c util.c
 cld_LDADD      = \
                  ../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \
-                 @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@
+                 @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \
+                 @EVENT_LIBS@
 
 cldbadm_SOURCES        = cldb.h cldbadm.c
 cldbadm_LDADD  = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
diff --git a/cld/cld.h b/cld/cld.h
index 4c0099f..17f14b8 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -22,8 +22,9 @@
 
 #include <netinet/in.h>
 #include <sys/time.h>
-#include <poll.h>
+#include <event.h>
 #include <glib.h>
+#include <elist.h>
 #include "cldb.h"
 #include <cld_msg_rpc.h>
 #include <cld_common.h>
@@ -59,13 +60,13 @@ struct session {
 
        uint64_t                last_contact;
        uint64_t                next_fh;
-       struct cld_timer        timer;
+       struct event            timer;
 
        uint64_t                next_seqid_in;
        uint64_t                next_seqid_out;
 
        GList                   *out_q;         /* outgoing pkts (to client) */
-       struct cld_timer        retry_timer;
+       struct event            retry_timer;
 
        char                    user[CLD_MAX_USERNAME];
 
@@ -85,10 +86,10 @@ struct server_stats {
        unsigned long           garbage;        /* num. garbage pkts dropped */
 };
 
-struct server_poll {
+struct server_socket {
        int                     fd;
-       bool                    (*cb)(int fd, short events, void *userdata);
-       void                    *userdata;
+       struct event            ev;
+       struct list_head        sockets_node;
 };
 
 struct server {
@@ -103,14 +104,13 @@ struct server {
 
        struct cldb             cldb;           /* database info */
 
-       GArray                  *polls;
-       GArray                  *poll_data;
+       struct event_base       *evbase_main;
 
-       GHashTable              *sessions;
+       struct list_head        sockets;
 
-       struct cld_timer_list   timers;
+       GHashTable              *sessions;
 
-       struct cld_timer        chkpt_timer;    /* db4 checkpoint timer */
+       struct event            chkpt_timer;    /* db4 checkpoint timer */
 
        struct server_stats     stats;          /* global statistics */
 };
diff --git a/cld/server.c b/cld/server.c
index 7a57785..aed501b 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct 
client *cli,
                       info->op);
 }
 
-static bool udp_srv_event(int fd, short events, void *userdata)
+static void udp_srv_event(int fd, short events, void *userdata)
 {
        struct client cli;
        char host[64];
@@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void 
*userdata)
        rrc = recvmsg(fd, &hdr, 0);
        if (rrc < 0) {
                syslogerr("UDP recvmsg");
-               return true; /* continue main loop; do NOT terminate server */
+               return;
        }
        cli.addr_len = hdr.msg_namelen;
 
@@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void 
*userdata)
 
        if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
                cld_srv.stats.garbage++;
-               return true;
+               return;
        }
 
        if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
                cld_srv.stats.garbage++;
-               return true;
+               return;
        }
 
        if (packet_is_dupe(&info)) {
                /* silently drop dupes */
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-               return true;
+               return;
        }
 
        err = validate_pkt_session(&info, &cli);
        if (err) {
                simple_sendresp(fd, &cli, &info, err);
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-               return true;
+               return;
        }
 
        err = pkt_chk_sig(raw_pkt, rrc, &pkt);
        if (err) {
                simple_sendresp(fd, &cli, &info, err);
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-               return true;
+               return;
        }
 
        if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
                simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
                               (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-               return true;
+               return;
        }
 
        err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
        if (err) {
                simple_sendresp(fd, &cli, &info, err);
                xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-               return true;
+               return;
        }
        xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-       return true;
 }
 
 static void add_chkpt_timer(void)
 {
-       cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer,
-                     time(NULL) + CLD_CHKPT_SEC);
+       struct timeval tv = { .tv_sec = CLD_CHKPT_SEC };
+
+       if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0)
+               HAIL_WARN(&srv_log, "chkpt timer add failed");
 }
 
-static void cldb_checkpoint(struct cld_timer *timer)
+static void cldb_checkpoint(int fd, short events, void *userdata)
 {
        DB_ENV *dbenv = cld_srv.cldb.env;
        int rc;
@@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const 
char *port_str)
 
 static void net_close(void)
 {
-       struct pollfd *pfd;
-       int i;
-
-       if (!cld_srv.polls)
-               return;
-
-       for (i = 0; i < cld_srv.polls->len; i++) {
-               pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-               if (pfd->fd >= 0) {
-                       if (close(pfd->fd) < 0)
-                               HAIL_WARN(&srv_log, "%s(%d): %s",
-                                         __func__, pfd->fd, strerror(errno));
-                       pfd->fd = -1;
+       struct server_socket *tmp, *iter;
+
+       list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) {
+               if (tmp->fd >= 0) {
+                       if (event_del(&tmp->ev) < 0)
+                               HAIL_WARN(&srv_log, "Event delete(%d) failed",
+                                         tmp->fd);
+                       if (close(tmp->fd) < 0)
+                               HAIL_WARN(&srv_log, "Close(%d) failed: %s",
+                                         tmp->fd, strerror(errno));
+                       tmp->fd = -1;
                }
+
+               list_del(&tmp->sockets_node);
+               free(tmp);
        }
 }
 
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
                           int addr_len, void *addr_ptr)
 {
-       struct server_poll sp;
-       struct pollfd pfd;
+       struct server_socket *sock;
        int fd, rc;
 
        fd = socket(addr_fam, sock_type, sock_prot);
@@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, 
int sock_prot,
                return -errno;
        }
 
-       sp.fd = fd;
-       sp.cb = udp_srv_event;
-       sp.userdata = NULL;
-       g_array_append_val(cld_srv.poll_data, sp);
+       sock = calloc(1, sizeof(*sock));
+       if (!sock) {
+               close(fd);
+               return -ENOMEM;
+       }
+
+       sock->fd = fd;
+       INIT_LIST_HEAD(&sock->sockets_node);
+
+       event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
+                 udp_srv_event, sock);
+
+       if (event_add(&sock->ev, NULL) < 0) {
+               free(sock);
+               close(fd);
+               return -EIO;
+       }
 
-       pfd.fd = fd;
-       pfd.events = POLLIN;
-       pfd.revents = 0;
-       g_array_append_val(cld_srv.polls, pfd);
+       list_add_tail(&sock->sockets_node, &cld_srv.sockets);
 
        return fd;
 }
@@ -891,11 +902,13 @@ static void segv_signal(int signo)
 static void term_signal(int signo)
 {
        server_running = false;
+       event_loopbreak();
 }
 
 static void stats_signal(int signo)
 {
        dump_stats = true;
+       event_loopbreak();
 }
 
 #define X(stat) \
@@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct 
argp_state *state)
 
 static int main_loop(void)
 {
-       time_t next_timeout;
-
-       next_timeout = cld_timers_run(&cld_srv.timers);
-
        while (server_running) {
-               struct pollfd *pfd;
-               int i, fired, rc;
-
                cld_srv.stats.poll++;
-
-               /* poll for fd activity, or next timer event */
-               rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0),
-                         cld_srv.polls->len,
-                         next_timeout ? (next_timeout * 1000) : -1);
-               if (rc < 0) {
-                       syslogerr("poll");
-                       if (errno != EINTR)
-                               break;
-               }
+               event_dispatch();
 
                gettimeofday(&current_time, NULL);
 
-               /* determine which fd's fired; call their callbacks */
-               fired = 0;
-               for (i = 0; i < cld_srv.polls->len; i++) {
-                       struct server_poll *sp;
-                       bool runrunrun;
-                       short revents;
-
-                       /* ref pollfd struct */
-                       pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-
-                       /* if no events fired, move on to next */
-                       if (!pfd->revents)
-                               continue;
-
-                       fired++;
-
-                       revents = pfd->revents;
-                       pfd->revents = 0;
-
-                       /* ref 1:1 matching server_poll struct */
-                       sp = &g_array_index(cld_srv.poll_data,
-                                           struct server_poll, i);
-
-                       cld_srv.stats.event++;
-
-                       /* call callback, shutting down server if requested */
-                       runrunrun = sp->cb(sp->fd, revents, sp->userdata);
-                       if (!runrunrun) {
-                               server_running = false;
-                               break;
-                       }
-
-                       /* if we reached poll(2) activity count, it is
-                        * pointless to continue looping
-                        */
-                       if (fired == rc)
-                               break;
-               }
-
                if (dump_stats) {
                        dump_stats = false;
                        stats_dump();
                }
-
-               next_timeout = cld_timers_run(&cld_srv.timers);
        }
 
        return 0;
@@ -1052,6 +1008,8 @@ int main (int argc, char *argv[])
        error_t aprc;
        int rc = 1;
 
+       INIT_LIST_HEAD(&cld_srv.sockets);
+
        /* isspace() and strcasecmp() consistency requires this */
        setlocale(LC_ALL, "C");
 
@@ -1075,6 +1033,8 @@ int main (int argc, char *argv[])
        if (use_syslog)
                openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
 
+       cld_srv.evbase_main = event_init();
+
        if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
                syslogerr("daemon");
                goto err_out;
@@ -1103,17 +1063,13 @@ int main (int argc, char *argv[])
 
        ensure_root();
 
-       cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint",
-                      cldb_checkpoint, NULL);
+       evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
        add_chkpt_timer();
 
        rc = 1;
 
        cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal);
-       cld_srv.poll_data = g_array_sized_new(FALSE, FALSE,
-                                          sizeof(struct server_poll), 4);
-       cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4);
-       if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls)
+       if (!cld_srv.sessions)
                goto err_out_pid;
 
        if (sess_load(cld_srv.sessions) != 0)
@@ -1137,7 +1093,8 @@ int main (int argc, char *argv[])
        HAIL_INFO(&srv_log, "shutting down");
 
        if (strict_free)
-               cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer);
+               if (evtimer_del(&cld_srv.chkpt_timer) < 0)
+                       HAIL_WARN(&srv_log, "chkpt timer del failed");
 
        if (cld_srv.cldb.up)
                cldb_down(&cld_srv.cldb);
@@ -1149,8 +1106,6 @@ err_out_pid:
 err_out:
        if (strict_free) {
                net_close();
-               g_array_free(cld_srv.polls, TRUE);
-               g_array_free(cld_srv.poll_data, TRUE);
                sessions_free();
                g_hash_table_unref(cld_srv.sessions);
        }
diff --git a/cld/session.c b/cld/session.c
index d76186b..9887aaa 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -43,8 +43,8 @@ struct session_outpkt {
        void                    *done_data;
 };
 
-static void session_retry(struct cld_timer *);
-static void session_timeout(struct cld_timer *);
+static void session_retry(int, short, void *);
+static void session_timeout(int, short, void *);
 static int sess_load_db(GHashTable *ss, DB_TXN *txn);
 static void op_unref(struct session_outpkt *op);
 
@@ -87,8 +87,8 @@ static struct session *session_new(void)
 
        cld_rand64(&sess->next_seqid_out);
 
-       cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess);
-       cld_timer_init(&sess->retry_timer, "session-retry", session_retry, 
sess);
+       evtimer_set(&sess->timer, session_timeout, sess);
+       evtimer_set(&sess->retry_timer, session_retry, sess);
 
        return sess;
 }
@@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool 
hash_remove)
        if (hash_remove)
                g_hash_table_remove(cld_srv.sessions, sess->sid);
 
-       cld_timer_del(&cld_srv.timers, &sess->timer);
-       cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+       if (evtimer_del(&sess->timer) < 0)
+               HAIL_ERR(&srv_log, "sess timer del failed");
+       if (evtimer_del(&sess->retry_timer) < 0)
+               HAIL_ERR(&srv_log, "sess retry timer del failed");
 
        tmp = sess->out_q;
        while (tmp) {
@@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt)
        outpkt->sess->ping_open = false;
 }
 
-static void session_timeout(struct cld_timer *timer)
+static void session_timeout(int fd, short events, void *userdata)
 {
-       struct session *sess = timer->userdata;
+       struct session *sess = userdata;
        uint64_t sess_expire;
        int rc;
        DB_ENV *dbenv = cld_srv.cldb.env;
@@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer)
 
        sess_expire = sess->last_contact + CLD_SESS_TIMEOUT;
        if (!sess->dead && (sess_expire > now)) {
+               struct timeval tv;
+
                if (!sess->ping_open &&
                    (sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) 
&&
                    (sess->sock_fd > 0)))) {
@@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer)
                                     session_ping_done, NULL);
                }
 
-               cld_timer_add(&cld_srv.timers, &sess->timer,
-                             now + ((sess_expire - now) / 2) + 1);
-               return; /* timer added; do not time out session */
+               tv.tv_sec = ((sess_expire - now) / 2) + 1;
+               tv.tv_usec = 0;
+               if (evtimer_add(&sess->timer, &tv) < 0)
+                       HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT,
+                                SIDARG(sess->sid));
+               return;         /* timer added; do not time out session */
        }
 
        HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT,
@@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t 
*next_retry_out)
        return rc;
 }
 
-static void session_retry(struct cld_timer *timer)
+static void session_retry(int fd, short events, void *userdata)
 {
-       struct session *sess = timer->userdata;
+       struct session *sess = userdata;
        time_t next_retry;
+       time_t now = time(NULL);
+       struct timeval tv;
 
        if (!sess->out_q)
                return;
 
        sess_retry_output(sess, &next_retry);
 
-       cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry);
+       tv.tv_sec = next_retry - now;
+       tv.tv_usec = 0;
+
+       if (evtimer_add(&sess->retry_timer, &tv) < 0)
+               HAIL_ERR(&srv_log, "retry timer re-add failed");
 }
 
 static void session_outq(struct session *sess, GList *new_pkts)
 {
        /* if out_q empty, start retry timer */
-       if (!sess->out_q)
-               cld_timer_add(&cld_srv.timers, &sess->retry_timer,
-                             time(NULL) + CLD_RETRY_START);
+       if (!sess->out_q) {
+               struct timeval tv = { .tv_sec = CLD_RETRY_START };
+               if (evtimer_add(&sess->retry_timer, &tv) < 0)
+                       HAIL_ERR(&srv_log, "retry timer start failed");
+       }
 
        sess->out_q = g_list_concat(sess->out_q, new_pkts);
 }
@@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid)
        }
 
        if (!sess->out_q)
-               cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+               if (evtimer_del(&sess->retry_timer) < 0)
+                       HAIL_ERR(&srv_log, "sess retry timer del 2 failed");
 }
 
 void msg_new_sess(int sock_fd, const struct client *cli,
@@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli,
        int rc;
        enum cle_err_codes resp_rc = CLE_OK;
        struct cld_msg_generic_resp resp;
+       struct timeval tv;
 
        sess = session_new();
        if (!sess) {
@@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli,
        g_hash_table_insert(cld_srv.sessions, sess->sid, sess);
 
        /* begin session timer */
-       cld_timer_add(&cld_srv.timers, &sess->timer,
-                     time(NULL) + (CLD_SESS_TIMEOUT / 2));
+       tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+       tv.tv_usec = 0;
+       if (evtimer_add(&sess->timer, &tv) < 0)
+               HAIL_ERR(&srv_log, "sess timer start failed");
 
        /* send new-sess reply */
        resp.code = CLE_OK;
@@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
        val.flags = DB_DBT_USERMEM;
 
        while (1) {
+               struct timeval tv;
+
                rc = cur->get(cur, &key, &val, DB_NEXT);
                if (rc == DB_NOTFOUND)
                        break;
@@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
                g_hash_table_insert(ss, sess->sid, sess);
 
                /* begin session timer */
-               cld_timer_add(&cld_srv.timers, &sess->timer,
-                             time(NULL) + (CLD_SESS_TIMEOUT / 2));
+               tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+               tv.tv_usec = 0;
+               if (evtimer_add(&sess->timer, &tv) < 0) {
+                       HAIL_ERR(&srv_log, "sess timer loop start failed");
+                       break;
+               }
        }
 
        cur->close(cur);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to