On 05/01/2010 10:56 PM, Pete Zaitcev wrote:
On Sat, 01 May 2010 18:28:42 -0400
Jeff Garzik<[email protected]>  wrote:

As I write this email, I am borrowing a lot of networking code from
tabled, to convert from GNet over to the more-flexible TCP server
codebase found in tabled -- notably the asynchronous background TCP
writing code in tabled.  Hopefully will finish and commit this by the
end of the weekend.

This seems crying for a common repository or something like libhail,
not sure what. Remember the timer case. Eventually we'll make changes
to tabled that itd will need to copy. But I don't know what course
is best.

I was definitely thinking along those lines, when I abstracted and modularized the code a bit. See attached... I put all the TCP write-related code into a two structures, tcp_write_state and tcp_write. The code received s/cli_wr/tcp_wr/g and other obvious, cosmetic changes.

libhail definitely seems like the direction to go. It would be easiest from a packaging perspective to put it into CLD. But maybe it deserves its own repo.?

        Jeff



====================================SNIP CUT HERE SNIP=========================

enum {
        TCP_MAX_WR_IOV          = 512,  /* arbitrary, pick better one */
        TCP_MAX_WR_CNT          = 10000,/* arbitrary, pick better one */
};

struct tcp_write_state {
        int                     fd;
        struct list_head        write_q;
        struct list_head        write_compl_q;
        size_t                  write_cnt;      /* water level */
        size_t                  write_cnt_max;
        bool                    writing;
        struct event            write_ev;

        void                    *priv;          /* useable by any app */

        /* stats */
        unsigned long           opt_write;
};

struct tcp_write {
        const void              *buf;           /* write buffer pointer */
        int                     togo;           /* write buffer remainder */

        int                     length;         /* length for accounting */

                                                /* callback */
        bool                    (*cb)(struct tcp_write_state *, void *, bool);
        void                    *cb_data;       /* data passed to cb */

        struct list_head        node;
};

extern int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int 
buflen,
               bool (*cb)(struct tcp_write_state *, void *, bool),
               void *cb_data);
extern bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool 
done);
extern void tcp_write_init(struct tcp_write_state *st, int fd);
extern void tcp_write_exit(struct tcp_write_state *st);
extern bool tcp_write_start(struct tcp_write_state *st);



====================================SNIP CUT HERE SNIP=========================



static void tcp_write_complete(struct tcp_write_state *st, struct tcp_write 
*tmp)
{
        list_del(&tmp->node);
        list_add_tail(&tmp->node, &st->write_compl_q);
}

bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done)
{
        free(cb_data);
        return false;
}

static bool tcp_write_free(struct tcp_write_state *st, struct tcp_write *tmp,
                           bool done)
{
        bool rcb = false;

        st->write_cnt -= tmp->length;
        list_del(&tmp->node);
        if (tmp->cb)
                rcb = tmp->cb(st, tmp->cb_data, done);
        free(tmp);

        return rcb;
}

static void tcp_write_free_all(struct tcp_write_state *st)
{
        struct tcp_write *wr, *tmp;

        list_for_each_entry_safe(wr, tmp, &st->write_compl_q, node) {
                tcp_write_free(st, wr, true);
        }
        list_for_each_entry_safe(wr, tmp, &st->write_q, node) {
                tcp_write_free(st, wr, false);
        }
}

bool tcp_write_run_compl(struct tcp_write_state *st)
{
        struct tcp_write *wr;
        bool do_loop;

        do_loop = false;
        while (!list_empty(&st->write_compl_q)) {
                wr = list_entry(st->write_compl_q.next, struct tcp_write,
                                node);
                do_loop |= tcp_write_free(st, wr, true);
        }
        return do_loop;
}

static bool tcp_writable(struct tcp_write_state *st)
{
        int n_iov;
        struct tcp_write *tmp;
        ssize_t rc;
        struct iovec iov[TCP_MAX_WR_IOV];

        /* accumulate pending writes into iovec */
        n_iov = 0;
        list_for_each_entry(tmp, &st->write_q, node) {
                if (n_iov == TCP_MAX_WR_IOV)
                        break;
                /* bleh, struct iovec should declare iov_base const */
                iov[n_iov].iov_base = (void *) tmp->buf;
                iov[n_iov].iov_len = tmp->togo;
                n_iov++;
        }

        /* execute non-blocking write */
do_write:
        rc = writev(st->fd, iov, n_iov);
        if (rc < 0) {
                if (errno == EINTR)
                        goto do_write;
                if (errno != EAGAIN)
                        goto err_out;
                return true;
        }

        /* iterate through write queue, issuing completions based on
         * amount of data written
         */
        while (rc > 0) {
                int sz;

                /* get pointer to first record on list */
                tmp = list_entry(st->write_q.next, struct tcp_write, node);

                /* mark data consumed by decreasing tmp->len */
                sz = (tmp->togo < rc) ? tmp->togo : rc;
                tmp->togo -= sz;
                tmp->buf += sz;
                rc -= sz;

                /* if tmp->len reaches zero, write is complete,
                 * so schedule it for clean up (cannot call callback
                 * right away or an endless recursion will result)
                 */
                if (tmp->togo == 0)
                        tcp_write_complete(st, tmp);
        }

        /* if we emptied the queue, clear write notification */
        if (list_empty(&st->write_q)) {
                st->writing = false;
                if (event_del(&st->write_ev) < 0)
                        goto err_out;
        }

        return true;

err_out:
        tcp_write_free_all(st);
        return false;
}

bool tcp_write_start(struct tcp_write_state *st)
{
        if (list_empty(&st->write_q))
                return true;            /* loop, not poll */

        /* if write-poll already active, nothing further to do */
        if (st->writing)
                return false;           /* poll wait */

        /* attempt optimistic write, in hopes of avoiding poll,
         * or at least refill the write buffers so as to not
         * get -immediately- called again by the kernel
         */
        tcp_writable(st);
        if (list_empty(&st->write_q)) {
                st->opt_write++;
                return true;            /* loop, not poll */
        }

        if (event_add(&st->write_ev, NULL) < 0)
                return true;            /* loop, not poll */

        st->writing = true;

        return false;                   /* poll wait */
}

int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
               bool (*cb)(struct tcp_write_state *, void *, bool),
               void *cb_data)
{
        struct tcp_write *wr;

        if (!buf || !buflen)
                return -EINVAL;

        wr = calloc(1, sizeof(struct tcp_write));
        if (!wr)
                return -ENOMEM;

        wr->buf = buf;
        wr->togo = buflen;
        wr->length = buflen;
        wr->cb = cb;
        wr->cb_data = cb_data;
        list_add_tail(&wr->node, &st->write_q);
        st->write_cnt += buflen;
        if (st->write_cnt > st->write_cnt_max)
                st->write_cnt_max = st->write_cnt;

        return 0;
}

size_t tcp_wqueued(struct tcp_write_state *st)
{
        return st->write_cnt;
}

static void tcp_wr_evt(int fd, short events, void *userdata)
{
        struct tcp_write_state *st = userdata;

        tcp_writable(st);
}

void tcp_write_init(struct tcp_write_state *st, int fd)
{
        memset(st, 0, sizeof(*st));

        st->fd = fd;

        INIT_LIST_HEAD(&st->write_q);
        INIT_LIST_HEAD(&st->write_compl_q);

        st->write_cnt_max = TCP_MAX_WR_CNT;

        event_set(&st->write_ev, fd, EV_WRITE | EV_PERSIST,
                  tcp_wr_evt, st);
}

void tcp_write_exit(struct tcp_write_state *st)
{
        if (st->writing)
                event_del(&st->write_ev);

        tcp_write_free_all(st);
}

====================================SNIP CUT HERE SNIP=========================

Reply via email to