Formatting got messed up on that.  Trying again:

+/* Compile-time limit on iovecs, so that we can allocate a maximum-size array
+ * of iovecs on the stack. */
+#define MAX_IOVS 128
+
+/* Maximum number of iovecs that may be passed to sendmsg, capped at a
+ * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.



I don't know a great deal about IOV maximums so the next comment may not be
reasonable:
I found _XOPEN_IOV_MAX a surprising minimum.  Wouldn't sysconf's opinion of
max_iovs be less than _XOPEN_IOV_MAX in rare error conditions?  In this case
wouldn't 1 be a safer minimum choice?



+        } else if (max_iovs > MAX_IOVS) {
+            max_iovs = 128;


I think you want max_iovs = MAX_IOVS.


+    max_batch_count = MAX(sock->rcvbuf / 4096, 1);
+    max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);


Did you choose 4096 here because it's the typical page size?  Would it make
sense to use the PAGESIZE macro if defined?

+        if (error == ENOBUFS) {
+            VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");


Is this code supposed to resend the request itself, or is the caller
responsible for handling that?


+        } else if (error) {
+            VLOG_ERR_RL(&rl, "transaction error (%s)", strerror(error));
+            nl_sock_record_errors__(transactions, n, error);
+        }
+    }
+}

We may want to undef MAX_BATCH_BYTES.  Probably doesn't matter, but seems
stylistically cleaner.

On Thu, Oct 13, 2011 at 13:22, Ethan Jackson <[email protected]> wrote:
> +/* Compile-time limit on iovecs, so that we can allocate a
> maximum-size array+ * of iovecs on the stack. */+#define MAX_IOVS
> 128++/* Maximum number of iovecs that may be passed to sendmsg, capped
> at a+ * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.
> I don't know a great deal about IOV maximums so the next comment may
> not bereasonable:I found _XOPEN_IOV_MAX a surprising minimum.
> Wouldn't sysconf's opinion ofmax_iovs be less than _XOPEN_IOV_MAX in
> rare error conditions?  In this casewouldn't 1 be a safer minimum
> choice?
>
> +        } else if (max_iovs > MAX_IOVS) {+            max_iovs = 128;
> I think you want max_iovs = MAX_IOVS.
> +    max_batch_count = MAX(sock->rcvbuf / 4096, 1);+
> max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);
> Did you choose 4096 here because it's the typical page size?  Would it
> makesense to use the PAGESIZE macro if defined?
> +        if (error == ENOBUFS) {+            VLOG_DBG_RL(&rl, "receive
> buffer overflow, resending request");
> Is this code supposed to resend the request itself, or is the
> callerresponsible for handling that?
>
> +        } else if (error) {+            VLOG_ERR_RL(&rl, "transaction
> error (%s)", strerror(error));+
> nl_sock_record_errors__(transactions, n, error);+        }+    }+}
> We may want to undef MAX_BATCH_BYTES.  Probably doesn't matter, but
> seemsstylistically cleaner.
> Ethan
>
> On Tue, Sep 27, 2011 at 16:27, Ben Pfaff <[email protected]> wrote:
>> This will be used in an upcoming commit.
>> ---
>>  lib/netlink-socket.c |  302 
>> ++++++++++++++++++++++++++++++++++++++++----------
>>  lib/netlink-socket.h |   14 +++
>>  2 files changed, 259 insertions(+), 57 deletions(-)
>>
>> diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
>> index 0f4e3e7..98697e2 100644
>> --- a/lib/netlink-socket.c
>> +++ b/lib/netlink-socket.c
>> @@ -21,6 +21,7 @@
>>  #include <inttypes.h>
>>  #include <stdlib.h>
>>  #include <sys/types.h>
>> +#include <sys/uio.h>
>>  #include <unistd.h>
>>  #include "coverage.h"
>>  #include "dynamic-string.h"
>> @@ -32,6 +33,7 @@
>>  #include "poll-loop.h"
>>  #include "socket-util.h"
>>  #include "stress.h"
>> +#include "util.h"
>>  #include "vlog.h"
>>
>>  VLOG_DEFINE_THIS_MODULE(netlink_socket);
>> @@ -63,8 +65,19 @@ struct nl_sock
>>     uint32_t pid;
>>     int protocol;
>>     struct nl_dump *dump;
>> +    unsigned int rcvbuf;        /* Receive buffer size (SO_RCVBUF). */
>>  };
>>
>> +/* Compile-time limit on iovecs, so that we can allocate a maximum-size 
>> array
>> + * of iovecs on the stack. */
>> +#define MAX_IOVS 128
>> +
>> +/* Maximum number of iovecs that may be passed to sendmsg, capped at a
>> + * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.
>> + *
>> + * Initialized by nl_sock_create(). */
>> +static int max_iovs;
>> +
>>  static int alloc_pid(uint32_t *);
>>  static void free_pid(uint32_t);
>>  static int nl_sock_cow__(struct nl_sock *);
>> @@ -79,6 +92,23 @@ nl_sock_create(int protocol, struct nl_sock **sockp)
>>     struct sockaddr_nl local, remote;
>>     int retval = 0;
>>
>> +    if (!max_iovs) {
>> +        int save_errno = errno;
>> +        errno = 0;
>> +
>> +        max_iovs = sysconf(_SC_UIO_MAXIOV);
>> +        if (max_iovs < _XOPEN_IOV_MAX) {
>> +            if (max_iovs == -1 && errno) {
>> +                VLOG_WARN("sysconf(_SC_UIO_MAXIOV): %s", strerror(errno));
>> +            }
>> +            max_iovs = _XOPEN_IOV_MAX;
>> +        } else if (max_iovs > MAX_IOVS) {
>> +            max_iovs = 128;
>> +        }
>> +
>> +        errno = save_errno;
>> +    }
>> +
>>     *sockp = NULL;
>>     sock = malloc(sizeof *sock);
>>     if (sock == NULL) {
>> @@ -93,6 +123,13 @@ nl_sock_create(int protocol, struct nl_sock **sockp)
>>     sock->protocol = protocol;
>>     sock->dump = NULL;
>>
>> +    retval = get_socket_rcvbuf(sock->fd);
>> +    if (retval < 0) {
>> +        retval = -retval;
>> +        goto error;
>> +    }
>> +    sock->rcvbuf = retval;
>> +
>>     retval = alloc_pid(&sock->pid);
>>     if (retval) {
>>         goto error;
>> @@ -354,6 +391,204 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf 
>> **bufp, bool wait)
>>     return nl_sock_recv__(sock, bufp, wait);
>>  }
>>
>> +static int
>> +find_nl_transaction_by_seq(struct nl_transaction **transactions, size_t n,
>> +                           uint32_t seq)
>> +{
>> +    int i;
>> +
>> +    for (i = 0; i < n; i++) {
>> +        struct nl_transaction *t = transactions[i];
>> +
>> +        if (seq == nl_msg_nlmsghdr(t->request)->nlmsg_seq) {
>> +            return i;
>> +        }
>> +    }
>> +
>> +    return -1;
>> +}
>> +
>> +static void
>> +nl_sock_record_errors__(struct nl_transaction **transactions, size_t n,
>> +                        int error)
>> +{
>> +    size_t i;
>> +
>> +    for (i = 0; i < n; i++) {
>> +        transactions[i]->error = error;
>> +        transactions[i]->reply = NULL;
>> +    }
>> +}
>> +
>> +static int
>> +nl_sock_transact_multiple__(struct nl_sock *sock,
>> +                            struct nl_transaction **transactions, size_t n,
>> +                            size_t *done)
>> +{
>> +    struct iovec iovs[MAX_IOVS];
>> +    struct msghdr msg;
>> +    int error;
>> +    int i;
>> +
>> +    *done = 0;
>> +    for (i = 0; i < n; i++) {
>> +        struct ofpbuf *request = transactions[i]->request;
>> +        struct nlmsghdr *nlmsg = nl_msg_nlmsghdr(request);
>> +
>> +        nlmsg->nlmsg_len = request->size;
>> +        nlmsg->nlmsg_pid = sock->pid;
>> +        if (i == n - 1) {
>> +            /* Ensure that we get a reply even if the final request doesn't
>> +             * ordinarily call for one. */
>> +            nlmsg->nlmsg_flags |= NLM_F_ACK;
>> +        }
>> +
>> +        iovs[i].iov_base = request->data;
>> +        iovs[i].iov_len = request->size;
>> +    }
>> +
>> +    memset(&msg, 0, sizeof msg);
>> +    msg.msg_iov = iovs;
>> +    msg.msg_iovlen = n;
>> +    do {
>> +        error = sendmsg(sock->fd, &msg, 0) < 0 ? errno : 0;
>> +    } while (error == EINTR);
>> +
>> +    for (i = 0; i < n; i++) {
>> +        struct ofpbuf *request = transactions[i]->request;
>> +
>> +        log_nlmsg(__func__, error, request->data, request->size,
>> +                  sock->protocol);
>> +    }
>> +    if (!error) {
>> +        COVERAGE_ADD(netlink_sent, n);
>> +    }
>> +
>> +    if (error) {
>> +        return error;
>> +    }
>> +
>> +    while (n > 0) {
>> +        struct ofpbuf *reply;
>> +
>> +        error = nl_sock_recv__(sock, &reply, true);
>> +        if (error) {
>> +            return error;
>> +        }
>> +
>> +        i = find_nl_transaction_by_seq(transactions, n,
>> +                                       nl_msg_nlmsghdr(reply)->nlmsg_seq);
>> +        if (i < 0) {
>> +            VLOG_DBG_RL(&rl, "ignoring unexpected seq %#"PRIx32,
>> +                        nl_msg_nlmsghdr(reply)->nlmsg_seq);
>> +            ofpbuf_delete(reply);
>> +            continue;
>> +        }
>> +
>> +        nl_sock_record_errors__(transactions, i, 0);
>> +        if (nl_msg_nlmsgerr(reply, &error)) {
>> +            transactions[i]->reply = NULL;
>> +            transactions[i]->error = error;
>> +            if (error) {
>> +                VLOG_DBG_RL(&rl, "received NAK error=%d (%s)",
>> +                            error, strerror(error));
>> +            }
>> +            ofpbuf_delete(reply);
>> +        } else {
>> +            transactions[i]->reply = reply;
>> +            transactions[i]->error = 0;
>> +        }
>> +
>> +        *done += i + 1;
>> +        transactions += i + 1;
>> +        n -= i + 1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/* Sends the 'request' member of the 'n' transactions in 'transactions' to 
>> the
>> + * kernel, in order, and waits for responses to all of them.  Fills in the
>> + * 'error' member of each transaction with 0 if it was successful, otherwise
>> + * with a positive errno value.  'reply' will be NULL on error or if the
>> + * transaction was successful but had no reply beyond an indication of 
>> success.
>> + * For a successful transaction that did have a more detailed reply, 'reply'
>> + * will be set to the reply message.
>> + *
>> + * The caller is responsible for destroying each request and reply, and the
>> + * transactions array itself.
>> + *
>> + * Before sending each message, this function will finalize nlmsg_len in 
>> each
>> + * 'request' to match the ofpbuf's size, and set nlmsg_pid to 'sock''s pid.
>> + * NLM_F_ACK will be added to some requests' nlmsg_flags.
>> + *
>> + * Bare Netlink is an unreliable transport protocol.  This function layers
>> + * reliable delivery and reply semantics on top of bare Netlink.  See
>> + * nl_sock_transact() for some caveats.
>> + */
>> +void
>> +nl_sock_transact_multiple(struct nl_sock *sock,
>> +                          struct nl_transaction **transactions, size_t n)
>> +{
>> +    int max_batch_count;
>> +    int error;
>> +
>> +    if (!n) {
>> +        return;
>> +    }
>> +
>> +    error = nl_sock_cow__(sock);
>> +    if (error) {
>> +        nl_sock_record_errors__(transactions, n, error);
>> +        return;
>> +    }
>> +
>> +    /* In theory, every request could have a 64 kB reply.  But the default 
>> and
>> +     * maximum socket rcvbuf size with typical Dom0 memory sizes both tend 
>> to
>> +     * be a bit below 128 kB, so that would only allow a single message in a
>> +     * "batch".  So we assume that replies average (at most) 4 kB, which 
>> allows
>> +     * a good deal of batching.
>> +     *
>> +     * Each request uses 2 iovecs so we cap batching at MAX_IOVS / 2.
>> +     *
>> +     * In practice, most of the requests that we batch either have no reply 
>> at
>> +     * all or a brief reply. */
>> +    max_batch_count = MAX(sock->rcvbuf / 4096, 1);
>> +    max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);
>> +
>> +    while (n > 0) {
>> +        size_t count, bytes;
>> +        size_t done;
>> +
>> +        /* Batch up to 'max_batch_count' transactions.  But cap it at about 
>> a
>> +         * page of requests total because big skbuffs are expensive to
>> +         * allocate in the kernel.  */
>> +#if defined(PAGESIZE)
>> +#define MAX_BATCH_BYTES MAX(1, PAGESIZE - 512)
>> +#else
>> +#define MAX_BATCH_BYTES (4096 - 512)
>> +#endif
>> +        bytes = transactions[0]->request->size;
>> +        for (count = 1; count < n && count < max_batch_count; count++) {
>> +            if (bytes + transactions[count]->request->size > 
>> MAX_BATCH_BYTES) {
>> +                break;
>> +            }
>> +            bytes += transactions[count]->request->size;
>> +        }
>> +
>> +        error = nl_sock_transact_multiple__(sock, transactions, count, 
>> &done);
>> +        transactions += done;
>> +        n -= done;
>> +
>> +        if (error == ENOBUFS) {
>> +            VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");
>> +        } else if (error) {
>> +            VLOG_ERR_RL(&rl, "transaction error (%s)", strerror(error));
>> +            nl_sock_record_errors__(transactions, n, error);
>> +        }
>> +    }
>> +}
>> +
>>  /* Sends 'request' to the kernel via 'sock' and waits for a response.  If
>>  * successful, returns 0.  On failure, returns a positive errno value.
>>  *
>> @@ -395,68 +630,21 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf 
>> **bufp, bool wait)
>>  *         needs to be idempotent.
>>  */
>>  int
>> -nl_sock_transact(struct nl_sock *sock,
>> -                 const struct ofpbuf *request, struct ofpbuf **replyp)
>> +nl_sock_transact(struct nl_sock *sock, const struct ofpbuf *request,
>> +                 struct ofpbuf **replyp)
>>  {
>> -    uint32_t seq = nl_msg_nlmsghdr(request)->nlmsg_seq;
>> -    struct nlmsghdr *nlmsghdr;
>> -    struct ofpbuf *reply;
>> -    int retval;
>> +    struct nl_transaction *transactionp;
>> +    struct nl_transaction transaction;
>>
>> +    transaction.request = (struct ofpbuf *) request;
>> +    transactionp = &transaction;
>> +    nl_sock_transact_multiple(sock, &transactionp, 1);
>>     if (replyp) {
>> -        *replyp = NULL;
>> -    }
>> -
>> -    /* Ensure that we get a reply even if this message doesn't ordinarily 
>> call
>> -     * for one. */
>> -    nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_ACK;
>> -
>> -send:
>> -    retval = nl_sock_send(sock, request, true);
>> -    if (retval) {
>> -        return retval;
>> -    }
>> -
>> -recv:
>> -    retval = nl_sock_recv(sock, &reply, true);
>> -    if (retval) {
>> -        if (retval == ENOBUFS) {
>> -            COVERAGE_INC(netlink_overflow);
>> -            VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");
>> -            goto send;
>> -        } else {
>> -            return retval;
>> -        }
>> -    }
>> -    nlmsghdr = nl_msg_nlmsghdr(reply);
>> -    if (seq != nlmsghdr->nlmsg_seq) {
>> -        VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
>> -                    nl_msg_nlmsghdr(reply)->nlmsg_seq, seq);
>> -        ofpbuf_delete(reply);
>> -        goto recv;
>> -    }
>> -
>> -    /* If the reply is an error, discard the reply and return the error 
>> code.
>> -     *
>> -     * Except: if the reply is just an acknowledgement (error code of 0), 
>> and
>> -     * the caller is interested in the reply (replyp != NULL), pass the 
>> reply
>> -     * up to the caller.  Otherwise the caller will get a return value of 0
>> -     * and null '*replyp', which makes unwary callers likely to segfault. */
>> -    if (nl_msg_nlmsgerr(reply, &retval) && (retval || !replyp)) {
>> -        ofpbuf_delete(reply);
>> -        if (retval) {
>> -            VLOG_DBG_RL(&rl, "received NAK error=%d (%s)",
>> -                        retval, strerror(retval));
>> -        }
>> -        return retval != EAGAIN ? retval : EPROTO;
>> -    }
>> -
>> -    if (replyp) {
>> -        *replyp = reply;
>> +        *replyp = transaction.reply;
>>     } else {
>> -        ofpbuf_delete(reply);
>> +        ofpbuf_delete(transaction.reply);
>>     }
>> -    return 0;
>> +    return transaction.error;
>>  }
>>
>>  /* Drain all the messages currently in 'sock''s receive queue. */
>> diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
>> index d789f41..7e01acb 100644
>> --- a/lib/netlink-socket.h
>> +++ b/lib/netlink-socket.h
>> @@ -35,6 +35,7 @@
>>  #include <stdbool.h>
>>  #include <stddef.h>
>>  #include <stdint.h>
>> +#include "list.h"
>>
>>  struct ofpbuf;
>>  struct nl_sock;
>> @@ -63,6 +64,19 @@ short int nl_sock_woke(const struct nl_sock *);
>>
>>  uint32_t nl_sock_pid(const struct nl_sock *);
>>
>> +/* Batching transactions. */
>> +struct nl_transaction {
>> +    /* Filled in by client. */
>> +    struct ofpbuf *request;     /* Request to send. */
>> +
>> +    /* Filled in by nl_sock_transact_batch(). */
>> +    struct ofpbuf *reply;       /* Reply (NULL if reply was an error code). 
>> */
>> +    int error;                  /* Positive errno value, 0 if no error. */
>> +};
>> +
>> +void nl_sock_transact_multiple(struct nl_sock *,
>> +                               struct nl_transaction **, size_t n);
>> +
>>  /* Table dumping. */
>>  struct nl_dump {
>>     struct nl_sock *sock;       /* Socket being dumped. */
>> --
>> 1.7.4.4
>>
>> _______________________________________________
>> dev mailing list
>> [email protected]
>> http://openvswitch.org/mailman/listinfo/dev
>>
>
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev

Reply via email to