On Sun, Nov 01, 2020 at 12:34:52AM +0100, g...@suckless.org wrote: > commit dff98c0bcaef7be220c563ebaebd66f8c6704197 > Author: Laslo Hunhold <d...@frign.de> > AuthorDate: Sun Nov 1 00:27:46 2020 +0100 > Commit: Laslo Hunhold <d...@frign.de> > CommitDate: Sun Nov 1 00:27:46 2020 +0100 > > Use epoll/kqueue and worker threads to handle connections > > This adds quite a bit of code, but is the culmination of the previous > restructurizations. Each worker thread has a connection pool and the > interesting part is that it's 100% nonblocking. If reading or writing > blocks at any point, the worker thread can just drop it and continue > with something else. This is especially powerful against attacks like > slow loris, which cannot be caught with a forking-model and could easily > be used in a DoS against a quark instance. > > There are no memory allocations at runtime, unless you use dirlistings, > whose libc-allocations you can't work around. > > In case the connection pool is exhausted due to a lot of slow lorises, > we still hit a DoS, but at least it can now be possible to assess the > connection pool and just drop another connection that can be > heuristically assessed as a "malicious" one (e.g. many connections from > one client, long time in one state or something using a monotonic > clock). > > Given we still sadly don't have kqueue in linux, which is 1000x times > better than epoll, which is deeply flawed, I wrote a very thin wrapper > in queue.{c,h} which exposes the necessary functions in a common > interface. > > Signed-off-by: Laslo Hunhold <d...@frign.de> > > diff --git a/Makefile b/Makefile > index 548e6aa..da0e458 100644 > --- a/Makefile > +++ b/Makefile > @@ -4,13 +4,13 @@ > > include config.mk > > -COMPONENTS = data http sock util > +COMPONENTS = data http queue sock util > > all: quark > > data.o: data.c data.h http.h util.h config.mk > http.o: http.c config.h http.h util.h config.mk > -main.o: main.c arg.h data.h http.h sock.h util.h config.mk > +main.o: main.c arg.h data.h http.h queue.h sock.h util.h config.mk > sock.o: sock.c sock.h util.h config.mk > util.o: util.c util.h config.mk > > diff --git a/config.mk b/config.mk > index 7056241..cedb3e7 100644 > --- a/config.mk > +++ b/config.mk > @@ -10,7 +10,7 @@ MANPREFIX = $(PREFIX)/share/man > # flags > CPPFLAGS = -DVERSION=\"$(VERSION)\" -D_DEFAULT_SOURCE -D_XOPEN_SOURCE=700 > -D_BSD_SOURCE > CFLAGS = -std=c99 -pedantic -Wall -Wextra -Os > -LDFLAGS = -s > +LDFLAGS = -lpthread -s > > # compiler and linker > CC = cc > diff --git a/main.c b/main.c > index d64774b..e26cc77 100644 > --- a/main.c > +++ b/main.c > @@ -3,6 +3,7 @@ > #include <grp.h> > #include <limits.h> > #include <netinet/in.h> > +#include <pthread.h> > #include <pwd.h> > #include <regex.h> > #include <signal.h> > @@ -19,6 +20,7 @@ > #include "arg.h" > #include "data.h" > #include "http.h" > +#include "queue.h" > #include "sock.h" > #include "util.h" > > @@ -48,16 +50,20 @@ logmsg(const struct connection *c) > } > > static void > -serve(struct connection *c, const struct server *srv) > +close_connection(struct connection *c) > +{ > + if (c != NULL) { > + close(c->fd); > + memset(c, 0, sizeof(*c)); > + } > +} > + > +static void > +serve_connection(struct connection *c, const struct server *srv) > { > enum status s; > int done; > > - /* set connection timeout */ > - if (sock_set_timeout(c->fd, 30)) { > - warn("sock_set_timeout: Failed"); > - } > - > switch (c->state) { > case C_VACANT: > /* > @@ -145,12 +151,212 @@ response: > } > err: > logmsg(c); > + close_connection(c); > +} > + > +struct connection * > +accept_connection(int insock, struct connection *connection, > + size_t nslots) > +{ > + struct connection *c = NULL; > + size_t j; > + > + /* find vacant connection (i.e. one with no fd assigned to it) */ > + for (j = 0; j < nslots; j++) { > + if (connection[j].fd == 0) { > + c = &connection[j]; > + break; > + } > + } > + if (j == nslots) { > + /* nothing available right now, return without accepting */ > + > + /* > + * NOTE: This is currently still not the best option, but > + * at least we now have control over it and can reap a > + * connection from our pool instead of previously when > + * we were forking and were more or less on our own in > + * each process > + */ > + return NULL; > + } > + > + /* accept connection */ > + if ((c->fd = accept(insock, (struct sockaddr *)&c->ia, > + &(socklen_t){sizeof(c->ia)})) < 0) { > + if (errno != EAGAIN && errno != EWOULDBLOCK) { > + /* not much we can do here */ > + warn("accept:"); > + } > + return NULL; > + } > + > + /* set socket to non-blocking mode */ > + if (sock_set_nonblocking(c->fd)) { > + /* we can't allow blocking sockets */ > + return NULL; > + } > + > + return c; > +} > + > +struct worker_data { > + int insock; > + size_t nslots; > + const struct server *srv; > +}; > + > +static void * > +thread_method(void *data) > +{ > + queue_event *event = NULL; > + struct connection *connection, *c; > + struct worker_data *d = (struct worker_data *)data; > + int qfd, nready, fd; > + size_t i; > + > + /* allocate connections */ > + if (!(connection = calloc(d->nslots, sizeof(*connection)))) { > + die("calloc:"); > + } > + > + /* create event queue */ > + if ((qfd = queue_create()) < 0) { > + exit(1); > + } > + > + /* add insock to the interest list */ > + if (queue_add_fd(qfd, d->insock, QUEUE_EVENT_IN, 1, NULL) < 0) { > + exit(1); > + } > + > + /* allocate event array */ > + if (!(event = reallocarray(event, d->nslots, sizeof(*event)))) { > + die("reallocarray:"); > + } > + > + for (;;) { > + /* wait for new activity */ > + if ((nready = queue_wait(qfd, event, d->nslots)) < 0) { > + exit(1); > + } > + > + /* handle events */ > + for (i = 0; i < (size_t)nready; i++) { > + if (event[i].events & (EPOLLERR | EPOLLHUP)) { > + fd = queue_event_get_fd(&event[i]); > + > + if (fd != d->insock) { > + memset(queue_event_get_ptr(&event[i]), > + 0, sizeof(struct connection)); > + } > + > + printf("dropped a connection\n"); > + > + continue; > + } > + > + if (queue_event_get_fd(&event[i]) == d->insock) { > + /* add new connection to the interest list */ > + if (!(c = accept_connection(d->insock, > + connection, > + d->nslots))) { > + /* > + * the socket is either blocking > + * or something failed. > + * In both cases, we just carry on > + */ > + continue; > + } > > - /* clean up and finish */ > - shutdown(c->fd, SHUT_RD); > - shutdown(c->fd, SHUT_WR); > - close(c->fd); > - c->state = C_VACANT; > + /* > + * add event to the interest list > + * (we want IN, because we start > + * with receiving the header) > + */ > + if (queue_add_fd(qfd, c->fd, > + QUEUE_EVENT_IN, > + 0, c) < 0) { > + /* not much we can do here */ > + continue; > + } > + } else { > + c = queue_event_get_ptr(&event[i]); > + > + /* serve existing connection */ > + serve_connection(c, d->srv); > + > + if (c->fd == 0) { > + /* we are done */ > + continue; > + } > + > + /* > + * rearm the event based on the state > + * we are "stuck" at > + */ > + switch(c->state) { > + case C_RECV_HEADER: > + if (queue_mod_fd(qfd, c->fd, > + QUEUE_EVENT_IN, > + c) < 0) { > + close_connection(c); > + break; > + } > + break; > + case C_SEND_HEADER: > + case C_SEND_BODY: > + if (queue_mod_fd(qfd, c->fd, > + QUEUE_EVENT_OUT, > + c) < 0) { > + close_connection(c); > + break; > + } > + break; > + default: > + break; > + } > + } > + } > + } > + > + return NULL; > +} > + > +static void > +handle_connections(int *insock, size_t nthreads, size_t nslots, > + const struct server *srv) > +{ > + pthread_t *thread = NULL; > + struct worker_data *d = NULL; > + size_t i; > + > + /* allocate worker_data structs */ > + if (!(d = reallocarray(d, nthreads, sizeof(*d)))) { > + die("reallocarray:"); > + } > + for (i = 0; i < nthreads; i++) { > + d[i].insock = insock[i]; > + d[i].nslots = nslots; > + d[i].srv = srv; > + } > + > + /* allocate and initialize thread pool */ > + if (!(thread = reallocarray(thread, nthreads, sizeof(*thread)))) { > + die("reallocarray:"); > + } > + for (i = 0; i < nthreads; i++) { > + if (pthread_create(&thread[i], NULL, thread_method, &d[i]) != > 0) { > + die("pthread_create:"); > + } > + } > + > + /* wait for threads */ > + for (i = 0; i < nthreads; i++) { > + if ((errno = pthread_join(thread[i], NULL))) { > + warn("pthread_join:"); > + } > + } > } > > static void > @@ -274,12 +480,13 @@ main(int argc, char *argv[]) > .docindex = "index.html", > }; > size_t i; > - int insock, status = 0; > + int *insock = NULL, status = 0; > const char *err; > char *tok[4]; > > /* defaults */ > - int maxnprocs = 512; > + size_t nthreads = 4; > + size_t nslots = 64; > char *servedir = "."; > char *user = "nobody"; > char *group = "nogroup"; > @@ -308,15 +515,23 @@ main(int argc, char *argv[]) > usage(); > } > if (!(srv.map = reallocarray(srv.map, ++srv.map_len, > - sizeof(struct map)))) { > + sizeof(struct map)))) { > die("reallocarray:"); > } > srv.map[srv.map_len - 1].from = tok[0]; > srv.map[srv.map_len - 1].to = tok[1]; > srv.map[srv.map_len - 1].chost = tok[2]; > break; > - case 'n': > - maxnprocs = strtonum(EARGF(usage()), 1, INT_MAX, &err); > + case 's': > + err = NULL; > + nslots = strtonum(EARGF(usage()), 1, INT_MAX, &err); > + if (err) { > + die("strtonum '%s': %s", EARGF(usage()), err); > + } > + break; > + case 't': > + err = NULL; > + nthreads = strtonum(EARGF(usage()), 1, INT_MAX, &err); > if (err) { > die("strtonum '%s': %s", EARGF(usage()), err); > } > @@ -371,8 +586,8 @@ main(int argc, char *argv[]) > } > } > > - /* raise the process limit */ > - rlim.rlim_cur = rlim.rlim_max = maxnprocs; > + /* raise the process limit (2 + nthreads) */ > + rlim.rlim_cur = rlim.rlim_max = 2 + nthreads; > if (setrlimit(RLIMIT_NPROC, &rlim) < 0) { > die("setrlimit RLIMIT_NPROC:"); > } > @@ -394,9 +609,20 @@ main(int argc, char *argv[]) > > handlesignals(sigcleanup); > > - /* bind socket */ > - insock = udsname ? sock_get_uds(udsname, pwd->pw_uid, grp->gr_gid) : > - sock_get_ips(srv.host, srv.port); > + /* create a nonblocking listening socket for each thread */ > + if (!(insock = reallocarray(insock, nthreads, sizeof(*insock)))) { > + die("reallocarray:"); > + } > + if (udsname ? sock_get_uds_arr(udsname, pwd->pw_uid, grp->gr_gid, > + insock, nthreads) : > + sock_get_ips_arr(srv.host, srv.port, insock, nthreads)) { > + return 1; > + } > + for (i = 0; i < nthreads; i++) { > + if (sock_set_nonblocking(insock[i])) { > + return 1; > + } > + } > > switch (fork()) { > case -1: > @@ -410,6 +636,9 @@ main(int argc, char *argv[]) > if (signal(SIGCHLD, SIG_IGN) == SIG_ERR) { > die("signal: Failed to set SIG_IGN on SIGCHLD"); > } > + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { > + die("signal: Failed to set SIG_IGN on SIGPIPE"); > + } > > /* limit ourselves to reading the servedir and block further > unveils */ > eunveil(servedir, "r"); > @@ -448,31 +677,8 @@ main(int argc, char *argv[]) > } > > /* accept incoming connections */ > - while (1) { > - struct connection c = { 0 }; > + handle_connections(insock, nthreads, nslots, &srv); > > - if ((c.fd = accept(insock, (struct sockaddr *)&c.ia, > - &(socklen_t){sizeof(c.ia)})) < 0) { > - warn("accept:"); > - continue; > - } > - > - /* fork and handle */ > - switch (fork()) { > - case 0: > - do { > - serve(&c, &srv); > - } while (c.state != C_VACANT); > - exit(0); > - break; > - case -1: > - warn("fork:"); > - /* fallthrough */ > - default: > - /* close the connection in the parent */ > - close(c.fd); > - } > - } > exit(0); > default: > /* limit ourselves even further while we are waiting */ > diff --git a/quark.1 b/quark.1 > index 6e0e5f8..d752cc7 100644 > --- a/quark.1 > +++ b/quark.1 > @@ -1,4 +1,4 @@ > -.Dd 2020-08-23 > +.Dd 2020-09-27 > .Dt QUARK 1 > .Os suckless.org > .Sh NAME > @@ -10,7 +10,8 @@ > .Op Fl h Ar host > .Op Fl u Ar user > .Op Fl g Ar group > -.Op Fl n Ar num > +.Op Fl s Ar num > +.Op Fl t Ar num > .Op Fl d Ar dir > .Op Fl l > .Op Fl i Ar file > @@ -21,7 +22,8 @@ > .Op Fl p Ar port > .Op Fl u Ar user > .Op Fl g Ar group > -.Op Fl n Ar num > +.Op Fl s Ar num > +.Op Fl t Ar num > .Op Fl d Ar dir > .Op Fl l > .Op Fl i Ar file > @@ -80,10 +82,6 @@ optionally limited to the canonical virtual host > If no virtual hosts are given, > .Pa chost > is ignored. > -.It Fl n Ar num > -Set the maximum number of threads to > -.Ar num . > -The default is 512. > .It Fl p Ar port > In host mode, listen on port > .Ar port > @@ -96,6 +94,14 @@ redirects on non-standard ports. > Create the UNIX-domain socket > .Ar file , > listen on it for incoming connections and remove it on exit. > +.It Fl s Ar num > +Set the number of connection slots per worker thread to > +.Ar num . > +The default is 64. > +.It Fl t Ar num > +Set the number of worker threads to > +.Ar num . > +The default is 4. > .It Fl u Ar user > Set user ID when dropping privileges, > and in socket mode the user of the socket file, > diff --git a/queue.c b/queue.c > index a3b8e45..b8267c8 100644 > --- a/queue.c > +++ b/queue.c > @@ -22,7 +22,9 @@ queue_create(void) > warn("epoll_create1:"); > } > #else > - > + if ((qfd = kqueue()) < 0) { > + warn("kqueue:"); > + } > #endif > > return qfd; > @@ -78,7 +80,27 @@ queue_add_fd(int qfd, int fd, enum queue_event_type t, int > shared, > return 1; > } > #else > + kevent e; > + int events; > + > + /* prepare event flag */ > + event = (shared) ? 0 : EV_CLEAR; > > + switch (t) { > + case QUEUE_EVENT_IN: > + events |= EVFILT_READ; > + break; > + case QUEUE_EVENT_OUT: > + events |= EVFILT_WRITE; > + break; > + } > + > + EV_SET(&e, fd, events, EV_ADD, 0, 0, 0); > + > + if (kevent(qfd, &e, 1, NULL, 0, NULL) < 0) { > + warn("kevent:"); > + return 1; > + } > #endif > > return 0; > @@ -90,8 +112,9 @@ queue_mod_fd(int qfd, int fd, enum queue_event_type t, > const void *data) > #ifdef __linux__ > struct epoll_event e; > > - /* set event flag */ > + /* set event flag (only for non-shared fd's) */ > e.events = EPOLLET; > + > switch (t) { > case QUEUE_EVENT_IN: > e.events |= EPOLLIN; > @@ -116,7 +139,26 @@ queue_mod_fd(int qfd, int fd, enum queue_event_type t, > const void *data) > return 1; > } > #else > + kevent e; > + int events; > + > + events = EV_CLEAR; > + > + switch (t) { > + case QUEUE_EVENT_IN: > + events |= EVFILT_READ; > + break; > + case QUEUE_EVENT_OUT: > + events |= EVFILT_WRITE; > + break; > + } > + > + EV_SET(&e, fd, events, EV_ADD, 0, 0, 0); > > + if (kevent(qfd, &e, 1, NULL, 0, NULL) < 0) { > + warn("kevent:"); > + return 1; > + } > #endif > > return 0; > @@ -133,7 +175,14 @@ queue_rem_fd(int qfd, int fd) > return 1; > } > #else > + kevent e; > > + EV_SET(&e, fd, 0, EV_DELETE, 0, 0, 0); > + > + if (kevent(qfd, &e, 1, NULL, 0, NULL) < 0) { > + warn("kevent:"); > + return 1; > + } > #endif > > return 0; > @@ -150,7 +199,10 @@ queue_wait(int qfd, queue_event *e, size_t elen) > return -1; > } > #else > - > + if ((nready = kevent(qfd, NULL, 0, e, elen, NULL) < 0) { > + warn("kevent:"); > + return 1; > + } > #endif > > return nready; > @@ -162,7 +214,7 @@ queue_event_get_fd(const queue_event *e) > #ifdef __linux__ > return e->data.fd; > #else > - > + return e->ident; > #endif > } > > @@ -172,6 +224,6 @@ queue_event_get_ptr(const queue_event *e) > #ifdef __linux__ > return e->data.ptr; > #else > - > + return e->udata; > #endif > } > diff --git a/sock.c b/sock.c > index c1fbd43..676f604 100644 > --- a/sock.c > +++ b/sock.c > @@ -1,5 +1,6 @@ > /* See LICENSE file for copyright and license details. */ > #include <arpa/inet.h> > +#include <fcntl.h> > #include <netdb.h> > #include <netinet/in.h> > #include <stddef.h> > @@ -16,7 +17,8 @@ > #include "util.h" > > int > -sock_get_ips(const char *host, const char* port) > +sock_get_ips_arr(const char *host, const char* port, int *sockfd, > + size_t sockfdlen) > { > struct addrinfo hints = { > .ai_flags = AI_NUMERICSERV, > @@ -24,39 +26,65 @@ sock_get_ips(const char *host, const char* port) > .ai_socktype = SOCK_STREAM, > }; > struct addrinfo *ai, *p; > - int ret, insock = 0; > + int r; > + size_t i, j; > > - if ((ret = getaddrinfo(host, port, &hints, &ai))) { > - die("getaddrinfo: %s", gai_strerror(ret)); > + if ((r = getaddrinfo(host, port, &hints, &ai))) { > + warn("getaddrinfo: %s", gai_strerror(r)); > + return 1; > } > > for (p = ai; p; p = p->ai_next) { > - if ((insock = socket(p->ai_family, p->ai_socktype, > - p->ai_protocol)) < 0) { > - continue; > - } > - if (setsockopt(insock, SOL_SOCKET, SO_REUSEADDR, > - &(int){1}, sizeof(int)) < 0) { > - die("setsockopt:"); > - } > - if (bind(insock, p->ai_addr, p->ai_addrlen) < 0) { > - if (close(insock) < 0) { > - die("close:"); > + /* try generating sockfds */ > + for (i = 0; i < sockfdlen; i++) { > + if ((sockfd[i] = socket(p->ai_family, p->ai_socktype, > + p->ai_protocol)) < 0) { > + /* retry with the next addrinfo */ > + break; > + } > + > + /* > + * set SO_REUSEPORT, so it becomes possible to bind > + * to the same port with multiple sockets, which > + * is what we're doing here > + */ > + if (setsockopt(sockfd[i], SOL_SOCKET, SO_REUSEPORT, > + &(int){1}, sizeof(int)) < 0) { > + warn("setsockopt:"); > + return 1; > + } > + > + if (bind(sockfd[i], p->ai_addr, p->ai_addrlen) < 0) { > + /* bind failed, close all previous fd's and > retry */ > + for (j = 0; j <= i; j++) { > + if (close(sockfd[i]) < 0) { > + warn("close:"); > + return 1; > + } > + } > + break; > } > - continue; > } > - break; > + if (i == sockfdlen) { > + /* we have generated all requested fds */ > + break; > + } > } > freeaddrinfo(ai); > if (!p) { > - die("bind:"); > + /* we exhaustet the addrinfo-list and found no connection */ > + warn("bind:"); > + return 1; > } > > - if (listen(insock, SOMAXCONN) < 0) { > - die("listen:"); > + for (i = 0; i < sockfdlen; i++) { > + if (listen(sockfd[i], SOMAXCONN) < 0) { > + warn("listen:"); > + return 1; > + } > } > > - return insock; > + return 0; > } > > void > @@ -68,44 +96,59 @@ sock_rem_uds(const char *udsname) > } > > int > -sock_get_uds(const char *udsname, uid_t uid, gid_t gid) > +sock_get_uds_arr(const char *udsname, uid_t uid, gid_t gid, int *sockfd, > + size_t sockfdlen) > { > struct sockaddr_un addr = { > .sun_family = AF_UNIX, > }; > - size_t udsnamelen; > + size_t udsnamelen, i; > int insock, sockmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | > S_IROTH | S_IWOTH; > > if ((insock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { > - die("socket:"); > + warn("socket:"); > + return 1; > } > > if ((udsnamelen = strlen(udsname)) > sizeof(addr.sun_path) - 1) { > - die("UNIX-domain socket name truncated"); > + warn("UNIX-domain socket name truncated"); > + return 1; > } > memcpy(addr.sun_path, udsname, udsnamelen + 1); > > if (bind(insock, (const struct sockaddr *)&addr, sizeof(addr)) < 0) { > - die("bind '%s':", udsname); > + warn("bind '%s':", udsname); > + return 1; > } > > if (listen(insock, SOMAXCONN) < 0) { > sock_rem_uds(udsname); > - die("listen:"); > + warn("listen:"); > + return 1; > } > > if (chmod(udsname, sockmode) < 0) { > sock_rem_uds(udsname); > - die("chmod '%s':", udsname); > + warn("chmod '%s':", udsname); > + return 1; > } > > if (chown(udsname, uid, gid) < 0) { > sock_rem_uds(udsname); > - die("chown '%s':", udsname); > + warn("chown '%s':", udsname); > + return 1; > } > > - return insock; > + for (i = 0; i < sockfdlen; i++) { > + /* > + * we can't bind to an AF_UNIX socket more than once, > + * so we just reuse the same fd on all threads. > + */ > + sockfd[i] = insock; > + } > + > + return 0; > } > > int > @@ -125,6 +168,26 @@ sock_set_timeout(int fd, int sec) > return 0; > } > > +int > +sock_set_nonblocking(int fd) > +{ > + int flags; > + > + if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { > + warn("fcntl:"); > + return 1; > + } > + > + flags |= O_NONBLOCK; > + > + if (fcntl(fd, F_SETFL, flags) < 0) { > + warn("fcntl:"); > + return 1; > + } > + > + return 0; > +} > + > int > sock_get_inaddr_str(const struct sockaddr_storage *in_sa, char *str, > size_t len) > diff --git a/sock.h b/sock.h > index 7084332..9834eda 100644 > --- a/sock.h > +++ b/sock.h > @@ -6,10 +6,11 @@ > #include <sys/socket.h> > #include <sys/types.h> > > -int sock_get_ips(const char *, const char *); > +int sock_get_ips_arr(const char *, const char *, int *, size_t); > void sock_rem_uds(const char *); > -int sock_get_uds(const char *, uid_t, gid_t); > +int sock_get_uds_arr(const char *, uid_t, gid_t, int *, size_t); > int sock_set_timeout(int, int); > +int sock_set_nonblocking(int); > int sock_get_inaddr_str(const struct sockaddr_storage *, char *, size_t); > > #endif /* SOCK_H */ >
Hi, This does not work on OpenBSD and it does not compile. -- Kind regards, Hiltjo