Just committed into libhail...  renamed the include to 'anet.h' for
'asynchronous networking'.

 include/Makefile.am |    2 
 include/anet.h      |  111 +++++++++++++++++++++++
 lib/Makefile.am     |    1 
 lib/atcp.c          |  241 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 354 insertions(+), 1 deletion(-)

commit 22de683a8f0566852818fac8b54ca26ae46490f0
Author: Jeff Garzik <j...@garzik.org>
Date:   Thu Sep 23 20:17:56 2010 -0400

    libhail: add async TCP network writing API, atcp_wr*
    
    Signed-off-by: Jeff Garzik <jgar...@redhat.com>

diff --git a/include/Makefile.am b/include/Makefile.am
index 234cf8a..967352a 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -5,5 +5,5 @@ EXTRA_DIST =            \
 
 include_HEADERS =      \
        cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h         \
-       hail_log.h hstor.h
+       hail_log.h hstor.h anet.h
 
diff --git a/include/anet.h b/include/anet.h
new file mode 100644
index 0000000..5c216c7
--- /dev/null
+++ b/include/anet.h
@@ -0,0 +1,111 @@
+#ifndef __ANET_H__
+#define __ANET_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <sys/time.h>
+#include <elist.h>
+
+enum {
+       ATCP_MAX_WR_IOV         = 32,           /* max iov per writev(2) */
+};
+
+typedef void (*atcp_ev_func)(int, short, void *);
+
+struct atcp_wr_ops {
+       int                     (*ev_wset)(void *, int, atcp_ev_func, void *);
+       int                     (*ev_add)(void *, const struct timeval *);
+       int                     (*ev_del)(void *);
+};
+
+struct atcp_wr_state {
+       int                     fd;             /* our socket */
+
+       bool                    writing;        /* actively trying to write? */
+
+       size_t                  write_cnt;      /* water level */
+       size_t                  write_cnt_max;
+
+       struct list_head        write_q;        /* list of async writes */
+       struct list_head        write_compl_q;  /* list of done writes */
+
+       void                    *priv;          /* untouched by atcp */
+
+       /* various statistics */
+       uint64_t                opt_write;      /* optimistic writes */
+
+       const struct atcp_wr_ops *ops;
+       void                    *ev_info;       /* passed to ops->ev_* */
+};
+
+typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool);
+
+struct atcp_write {
+       const void              *buf;           /* write buffer pointer */
+       int                     togo;           /* write buffer remainder */
+
+       int                     length;         /* length for accounting */
+       atcp_write_func         cb;             /* callback */
+       void                    *cb_data;       /* data passed to cb */
+
+       struct atcp_wr_state    *wst;           /* our parent */
+
+       struct list_head        node;           /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst,
+                         const struct atcp_wr_ops *ops, void *ev_info,
+                         void *priv);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned 
int buflen,
+               atcp_write_func cb, void *cb_data);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+/* is anything on the write queue at the moment? */
+static inline bool atcp_wq_empty(struct atcp_wr_state *wst)
+{
+       return list_empty(&wst->write_q) ? true : false;
+}
+
+/* total number of octets queued at this moment */
+static inline size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+       return wst->write_cnt;
+}
+
+#endif /* __ANET_H__ */
diff --git a/lib/Makefile.am b/lib/Makefile.am
index f7b27ff..616b881 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -21,6 +21,7 @@ LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@
 lib_LTLIBRARIES                = libhail.la
 
 libhail_la_SOURCES     =       \
+       atcp.c                  \
        cldc.c                  \
        cldc-udp.c              \
        cldc-dns.c              \
diff --git a/lib/atcp.c b/lib/atcp.c
new file mode 100644
index 0000000..dfdb954
--- /dev/null
+++ b/lib/atcp.c
@@ -0,0 +1,241 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include "hail-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include <anet.h>
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+       free(cb_data);
+       return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+       struct atcp_wr_state *wst = tmp->wst;
+
+       list_del(&tmp->node);
+       list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+       struct atcp_wr_state *wst = tmp->wst;
+       bool rcb = false;
+
+       wst->write_cnt -= tmp->length;
+       list_del_init(&tmp->node);
+       if (tmp->cb)
+               rcb = tmp->cb(wst, tmp->cb_data, done);
+       free(tmp);
+
+       return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+       struct atcp_write *wr;
+       bool do_loop;
+
+       do_loop = false;
+       while (!list_empty(&wst->write_compl_q)) {
+               wr = list_entry(wst->write_compl_q.next,
+                               struct atcp_write, node);
+               do_loop |= atcp_write_free(wr, true);
+       }
+       return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+       struct atcp_write *wr, *tmp;
+
+       atcp_write_run_compl(wst);
+       list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+               atcp_write_free(wr, false);
+       }
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+       int n_iov;
+       struct atcp_write *tmp;
+       ssize_t rc;
+       struct iovec iov[ATCP_MAX_WR_IOV];
+
+       /* accumulate pending writes into iovec */
+       n_iov = 0;
+       list_for_each_entry(tmp, &wst->write_q, node) {
+               if (n_iov == ATCP_MAX_WR_IOV)
+                       break;
+               /* bleh, struct iovec should declare iov_base const */
+               iov[n_iov].iov_base = (void *) tmp->buf;
+               iov[n_iov].iov_len = tmp->togo;
+               n_iov++;
+       }
+
+       /* execute non-blocking write */
+do_write:
+       rc = writev(wst->fd, iov, n_iov);
+       if (rc < 0) {
+               if (errno == EINTR)
+                       goto do_write;
+               if (errno != EAGAIN)
+                       goto err_out;
+               return true;
+       }
+
+       /* iterate through write queue, issuing completions based on
+        * amount of data written
+        */
+       while (rc > 0) {
+               int sz;
+
+               /* get pointer to first record on list */
+               tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+               /* mark data consumed by decreasing tmp->len */
+               sz = (tmp->togo < rc) ? tmp->togo : rc;
+               tmp->togo -= sz;
+               tmp->buf += sz;
+               rc -= sz;
+
+               /* if tmp->len reaches zero, write is complete,
+                * so schedule it for clean up (cannot call callback
+                * right away or an endless recursion will result)
+                */
+               if (tmp->togo == 0)
+                       atcp_write_complete(tmp);
+       }
+
+       /* if we emptied the queue, clear write notification */
+       if (atcp_wq_empty(wst)) {
+               wst->writing = false;
+               if (wst->ops->ev_del(wst->ev_info) < 0)
+                       goto err_out;
+       }
+
+       return true;
+
+err_out:
+       atcp_write_free_all(wst);
+       return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+       struct atcp_wr_state *wst = userdata;
+
+       atcp_writable(wst);
+       atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+       wst->fd = fd;
+
+       wst->ops->ev_wset(wst->ev_info, wst->fd,
+                 atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+       if (atcp_wq_empty(wst))
+               return true;            /* loop, not poll */
+
+       /* if write-poll already active, nothing further to do */
+       if (wst->writing)
+               return false;           /* poll wait */
+
+       /* attempt optimistic write, in hopes of avoiding poll,
+        * or at least refill the write buffers so as to not
+        * get -immediately- called again by the kernel
+        */
+       atcp_writable(wst);
+       if (atcp_wq_empty(wst)) {
+               wst->opt_write++;
+               return true;            /* loop, not poll */
+       }
+
+       if (wst->ops->ev_add(wst->ev_info, NULL) < 0)
+               return true;            /* loop, not poll */
+
+       wst->writing = true;
+
+       return false;                   /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int 
buflen,
+               atcp_write_func cb, void *cb_data)
+{
+       struct atcp_write *wr;
+
+       if (!buf || !buflen)
+               return -EINVAL;
+
+       wr = calloc(1, sizeof(struct atcp_write));
+       if (!wr)
+               return -ENOMEM;
+
+       wr->buf = buf;
+       wr->togo = buflen;
+       wr->length = buflen;
+       wr->cb = cb;
+       wr->cb_data = cb_data;
+       wr->wst = wst;
+       list_add_tail(&wr->node, &wst->write_q);
+       wst->write_cnt += buflen;
+       if (wst->write_cnt > wst->write_cnt_max)
+               wst->write_cnt_max = wst->write_cnt;
+
+       return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+       if (!wst)
+               return;
+
+       if (wst->writing)
+               wst->ops->ev_del(wst->ev_info);
+       
+       atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst,
+                 const struct atcp_wr_ops *ops, void *ev_info,
+                 void *priv)
+{
+       memset(wst, 0, sizeof(*wst));
+
+       INIT_LIST_HEAD(&wst->write_q);
+       INIT_LIST_HEAD(&wst->write_compl_q);
+
+       wst->fd = -1;
+
+       wst->ops = ops;
+       wst->ev_info = ev_info;
+       wst->priv = priv;
+}
+
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to