Formatting got messed up on that. Trying again:
+/* Compile-time limit on iovecs, so that we can allocate a maximum-size array
+ * of iovecs on the stack. */
+#define MAX_IOVS 128
+
+/* Maximum number of iovecs that may be passed to sendmsg, capped at a
+ * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.
I don't know a great deal about IOV maximums so the next comment may not be
reasonable:
I found _XOPEN_IOV_MAX a surprising minimum. Wouldn't sysconf's opinion of
max_iovs be less than _XOPEN_IOV_MAX in rare error conditions? In this case
wouldn't 1 be a safer minimum choice?
+ } else if (max_iovs > MAX_IOVS) {
+ max_iovs = 128;
I think you want max_iovs = MAX_IOVS.
+ max_batch_count = MAX(sock->rcvbuf / 4096, 1);
+ max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);
Did you choose 4096 here because it's the typical page size? Would it make
sense to use the PAGESIZE macro if defined?
+ if (error == ENOBUFS) {
+ VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");
Is this code supposed to resend the request itself, or is the caller
responsible for handling that?
+ } else if (error) {
+ VLOG_ERR_RL(&rl, "transaction error (%s)", strerror(error));
+ nl_sock_record_errors__(transactions, n, error);
+ }
+ }
+}
We may want to undef MAX_BATCH_BYTES. Probably doesn't matter, but seems
stylistically cleaner.
On Thu, Oct 13, 2011 at 13:22, Ethan Jackson <[email protected]> wrote:
> +/* Compile-time limit on iovecs, so that we can allocate a
> maximum-size array+ * of iovecs on the stack. */+#define MAX_IOVS
> 128++/* Maximum number of iovecs that may be passed to sendmsg, capped
> at a+ * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.
> I don't know a great deal about IOV maximums so the next comment may
> not bereasonable:I found _XOPEN_IOV_MAX a surprising minimum.
> Wouldn't sysconf's opinion ofmax_iovs be less than _XOPEN_IOV_MAX in
> rare error conditions? In this casewouldn't 1 be a safer minimum
> choice?
>
> + } else if (max_iovs > MAX_IOVS) {+ max_iovs = 128;
> I think you want max_iovs = MAX_IOVS.
> + max_batch_count = MAX(sock->rcvbuf / 4096, 1);+
> max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);
> Did you choose 4096 here because it's the typical page size? Would it
> makesense to use the PAGESIZE macro if defined?
> + if (error == ENOBUFS) {+ VLOG_DBG_RL(&rl, "receive
> buffer overflow, resending request");
> Is this code supposed to resend the request itself, or is the
> callerresponsible for handling that?
>
> + } else if (error) {+ VLOG_ERR_RL(&rl, "transaction
> error (%s)", strerror(error));+
> nl_sock_record_errors__(transactions, n, error);+ }+ }+}
> We may want to undef MAX_BATCH_BYTES. Probably doesn't matter, but
> seemsstylistically cleaner.
> Ethan
>
> On Tue, Sep 27, 2011 at 16:27, Ben Pfaff <[email protected]> wrote:
>> This will be used in an upcoming commit.
>> ---
>> lib/netlink-socket.c | 302
>> ++++++++++++++++++++++++++++++++++++++++----------
>> lib/netlink-socket.h | 14 +++
>> 2 files changed, 259 insertions(+), 57 deletions(-)
>>
>> diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
>> index 0f4e3e7..98697e2 100644
>> --- a/lib/netlink-socket.c
>> +++ b/lib/netlink-socket.c
>> @@ -21,6 +21,7 @@
>> #include <inttypes.h>
>> #include <stdlib.h>
>> #include <sys/types.h>
>> +#include <sys/uio.h>
>> #include <unistd.h>
>> #include "coverage.h"
>> #include "dynamic-string.h"
>> @@ -32,6 +33,7 @@
>> #include "poll-loop.h"
>> #include "socket-util.h"
>> #include "stress.h"
>> +#include "util.h"
>> #include "vlog.h"
>>
>> VLOG_DEFINE_THIS_MODULE(netlink_socket);
>> @@ -63,8 +65,19 @@ struct nl_sock
>> uint32_t pid;
>> int protocol;
>> struct nl_dump *dump;
>> + unsigned int rcvbuf; /* Receive buffer size (SO_RCVBUF). */
>> };
>>
>> +/* Compile-time limit on iovecs, so that we can allocate a maximum-size
>> array
>> + * of iovecs on the stack. */
>> +#define MAX_IOVS 128
>> +
>> +/* Maximum number of iovecs that may be passed to sendmsg, capped at a
>> + * minimum of _XOPEN_IOV_MAX (16) and a maximum of MAX_IOVS.
>> + *
>> + * Initialized by nl_sock_create(). */
>> +static int max_iovs;
>> +
>> static int alloc_pid(uint32_t *);
>> static void free_pid(uint32_t);
>> static int nl_sock_cow__(struct nl_sock *);
>> @@ -79,6 +92,23 @@ nl_sock_create(int protocol, struct nl_sock **sockp)
>> struct sockaddr_nl local, remote;
>> int retval = 0;
>>
>> + if (!max_iovs) {
>> + int save_errno = errno;
>> + errno = 0;
>> +
>> + max_iovs = sysconf(_SC_UIO_MAXIOV);
>> + if (max_iovs < _XOPEN_IOV_MAX) {
>> + if (max_iovs == -1 && errno) {
>> + VLOG_WARN("sysconf(_SC_UIO_MAXIOV): %s", strerror(errno));
>> + }
>> + max_iovs = _XOPEN_IOV_MAX;
>> + } else if (max_iovs > MAX_IOVS) {
>> + max_iovs = 128;
>> + }
>> +
>> + errno = save_errno;
>> + }
>> +
>> *sockp = NULL;
>> sock = malloc(sizeof *sock);
>> if (sock == NULL) {
>> @@ -93,6 +123,13 @@ nl_sock_create(int protocol, struct nl_sock **sockp)
>> sock->protocol = protocol;
>> sock->dump = NULL;
>>
>> + retval = get_socket_rcvbuf(sock->fd);
>> + if (retval < 0) {
>> + retval = -retval;
>> + goto error;
>> + }
>> + sock->rcvbuf = retval;
>> +
>> retval = alloc_pid(&sock->pid);
>> if (retval) {
>> goto error;
>> @@ -354,6 +391,204 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf
>> **bufp, bool wait)
>> return nl_sock_recv__(sock, bufp, wait);
>> }
>>
>> +static int
>> +find_nl_transaction_by_seq(struct nl_transaction **transactions, size_t n,
>> + uint32_t seq)
>> +{
>> + int i;
>> +
>> + for (i = 0; i < n; i++) {
>> + struct nl_transaction *t = transactions[i];
>> +
>> + if (seq == nl_msg_nlmsghdr(t->request)->nlmsg_seq) {
>> + return i;
>> + }
>> + }
>> +
>> + return -1;
>> +}
>> +
>> +static void
>> +nl_sock_record_errors__(struct nl_transaction **transactions, size_t n,
>> + int error)
>> +{
>> + size_t i;
>> +
>> + for (i = 0; i < n; i++) {
>> + transactions[i]->error = error;
>> + transactions[i]->reply = NULL;
>> + }
>> +}
>> +
>> +static int
>> +nl_sock_transact_multiple__(struct nl_sock *sock,
>> + struct nl_transaction **transactions, size_t n,
>> + size_t *done)
>> +{
>> + struct iovec iovs[MAX_IOVS];
>> + struct msghdr msg;
>> + int error;
>> + int i;
>> +
>> + *done = 0;
>> + for (i = 0; i < n; i++) {
>> + struct ofpbuf *request = transactions[i]->request;
>> + struct nlmsghdr *nlmsg = nl_msg_nlmsghdr(request);
>> +
>> + nlmsg->nlmsg_len = request->size;
>> + nlmsg->nlmsg_pid = sock->pid;
>> + if (i == n - 1) {
>> + /* Ensure that we get a reply even if the final request doesn't
>> + * ordinarily call for one. */
>> + nlmsg->nlmsg_flags |= NLM_F_ACK;
>> + }
>> +
>> + iovs[i].iov_base = request->data;
>> + iovs[i].iov_len = request->size;
>> + }
>> +
>> + memset(&msg, 0, sizeof msg);
>> + msg.msg_iov = iovs;
>> + msg.msg_iovlen = n;
>> + do {
>> + error = sendmsg(sock->fd, &msg, 0) < 0 ? errno : 0;
>> + } while (error == EINTR);
>> +
>> + for (i = 0; i < n; i++) {
>> + struct ofpbuf *request = transactions[i]->request;
>> +
>> + log_nlmsg(__func__, error, request->data, request->size,
>> + sock->protocol);
>> + }
>> + if (!error) {
>> + COVERAGE_ADD(netlink_sent, n);
>> + }
>> +
>> + if (error) {
>> + return error;
>> + }
>> +
>> + while (n > 0) {
>> + struct ofpbuf *reply;
>> +
>> + error = nl_sock_recv__(sock, &reply, true);
>> + if (error) {
>> + return error;
>> + }
>> +
>> + i = find_nl_transaction_by_seq(transactions, n,
>> + nl_msg_nlmsghdr(reply)->nlmsg_seq);
>> + if (i < 0) {
>> + VLOG_DBG_RL(&rl, "ignoring unexpected seq %#"PRIx32,
>> + nl_msg_nlmsghdr(reply)->nlmsg_seq);
>> + ofpbuf_delete(reply);
>> + continue;
>> + }
>> +
>> + nl_sock_record_errors__(transactions, i, 0);
>> + if (nl_msg_nlmsgerr(reply, &error)) {
>> + transactions[i]->reply = NULL;
>> + transactions[i]->error = error;
>> + if (error) {
>> + VLOG_DBG_RL(&rl, "received NAK error=%d (%s)",
>> + error, strerror(error));
>> + }
>> + ofpbuf_delete(reply);
>> + } else {
>> + transactions[i]->reply = reply;
>> + transactions[i]->error = 0;
>> + }
>> +
>> + *done += i + 1;
>> + transactions += i + 1;
>> + n -= i + 1;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +/* Sends the 'request' member of the 'n' transactions in 'transactions' to
>> the
>> + * kernel, in order, and waits for responses to all of them. Fills in the
>> + * 'error' member of each transaction with 0 if it was successful, otherwise
>> + * with a positive errno value. 'reply' will be NULL on error or if the
>> + * transaction was successful but had no reply beyond an indication of
>> success.
>> + * For a successful transaction that did have a more detailed reply, 'reply'
>> + * will be set to the reply message.
>> + *
>> + * The caller is responsible for destroying each request and reply, and the
>> + * transactions array itself.
>> + *
>> + * Before sending each message, this function will finalize nlmsg_len in
>> each
>> + * 'request' to match the ofpbuf's size, and set nlmsg_pid to 'sock''s pid.
>> + * NLM_F_ACK will be added to some requests' nlmsg_flags.
>> + *
>> + * Bare Netlink is an unreliable transport protocol. This function layers
>> + * reliable delivery and reply semantics on top of bare Netlink. See
>> + * nl_sock_transact() for some caveats.
>> + */
>> +void
>> +nl_sock_transact_multiple(struct nl_sock *sock,
>> + struct nl_transaction **transactions, size_t n)
>> +{
>> + int max_batch_count;
>> + int error;
>> +
>> + if (!n) {
>> + return;
>> + }
>> +
>> + error = nl_sock_cow__(sock);
>> + if (error) {
>> + nl_sock_record_errors__(transactions, n, error);
>> + return;
>> + }
>> +
>> + /* In theory, every request could have a 64 kB reply. But the default
>> and
>> + * maximum socket rcvbuf size with typical Dom0 memory sizes both tend
>> to
>> + * be a bit below 128 kB, so that would only allow a single message in a
>> + * "batch". So we assume that replies average (at most) 4 kB, which
>> allows
>> + * a good deal of batching.
>> + *
>> + * Each request uses 2 iovecs so we cap batching at MAX_IOVS / 2.
>> + *
>> + * In practice, most of the requests that we batch either have no reply
>> at
>> + * all or a brief reply. */
>> + max_batch_count = MAX(sock->rcvbuf / 4096, 1);
>> + max_batch_count = MIN(max_batch_count, MAX_IOVS / 2);
>> +
>> + while (n > 0) {
>> + size_t count, bytes;
>> + size_t done;
>> +
>> + /* Batch up to 'max_batch_count' transactions. But cap it at about
>> a
>> + * page of requests total because big skbuffs are expensive to
>> + * allocate in the kernel. */
>> +#if defined(PAGESIZE)
>> +#define MAX_BATCH_BYTES MAX(1, PAGESIZE - 512)
>> +#else
>> +#define MAX_BATCH_BYTES (4096 - 512)
>> +#endif
>> + bytes = transactions[0]->request->size;
>> + for (count = 1; count < n && count < max_batch_count; count++) {
>> + if (bytes + transactions[count]->request->size >
>> MAX_BATCH_BYTES) {
>> + break;
>> + }
>> + bytes += transactions[count]->request->size;
>> + }
>> +
>> + error = nl_sock_transact_multiple__(sock, transactions, count,
>> &done);
>> + transactions += done;
>> + n -= done;
>> +
>> + if (error == ENOBUFS) {
>> + VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");
>> + } else if (error) {
>> + VLOG_ERR_RL(&rl, "transaction error (%s)", strerror(error));
>> + nl_sock_record_errors__(transactions, n, error);
>> + }
>> + }
>> +}
>> +
>> /* Sends 'request' to the kernel via 'sock' and waits for a response. If
>> * successful, returns 0. On failure, returns a positive errno value.
>> *
>> @@ -395,68 +630,21 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf
>> **bufp, bool wait)
>> * needs to be idempotent.
>> */
>> int
>> -nl_sock_transact(struct nl_sock *sock,
>> - const struct ofpbuf *request, struct ofpbuf **replyp)
>> +nl_sock_transact(struct nl_sock *sock, const struct ofpbuf *request,
>> + struct ofpbuf **replyp)
>> {
>> - uint32_t seq = nl_msg_nlmsghdr(request)->nlmsg_seq;
>> - struct nlmsghdr *nlmsghdr;
>> - struct ofpbuf *reply;
>> - int retval;
>> + struct nl_transaction *transactionp;
>> + struct nl_transaction transaction;
>>
>> + transaction.request = (struct ofpbuf *) request;
>> + transactionp = &transaction;
>> + nl_sock_transact_multiple(sock, &transactionp, 1);
>> if (replyp) {
>> - *replyp = NULL;
>> - }
>> -
>> - /* Ensure that we get a reply even if this message doesn't ordinarily
>> call
>> - * for one. */
>> - nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_ACK;
>> -
>> -send:
>> - retval = nl_sock_send(sock, request, true);
>> - if (retval) {
>> - return retval;
>> - }
>> -
>> -recv:
>> - retval = nl_sock_recv(sock, &reply, true);
>> - if (retval) {
>> - if (retval == ENOBUFS) {
>> - COVERAGE_INC(netlink_overflow);
>> - VLOG_DBG_RL(&rl, "receive buffer overflow, resending request");
>> - goto send;
>> - } else {
>> - return retval;
>> - }
>> - }
>> - nlmsghdr = nl_msg_nlmsghdr(reply);
>> - if (seq != nlmsghdr->nlmsg_seq) {
>> - VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
>> - nl_msg_nlmsghdr(reply)->nlmsg_seq, seq);
>> - ofpbuf_delete(reply);
>> - goto recv;
>> - }
>> -
>> - /* If the reply is an error, discard the reply and return the error
>> code.
>> - *
>> - * Except: if the reply is just an acknowledgement (error code of 0),
>> and
>> - * the caller is interested in the reply (replyp != NULL), pass the
>> reply
>> - * up to the caller. Otherwise the caller will get a return value of 0
>> - * and null '*replyp', which makes unwary callers likely to segfault. */
>> - if (nl_msg_nlmsgerr(reply, &retval) && (retval || !replyp)) {
>> - ofpbuf_delete(reply);
>> - if (retval) {
>> - VLOG_DBG_RL(&rl, "received NAK error=%d (%s)",
>> - retval, strerror(retval));
>> - }
>> - return retval != EAGAIN ? retval : EPROTO;
>> - }
>> -
>> - if (replyp) {
>> - *replyp = reply;
>> + *replyp = transaction.reply;
>> } else {
>> - ofpbuf_delete(reply);
>> + ofpbuf_delete(transaction.reply);
>> }
>> - return 0;
>> + return transaction.error;
>> }
>>
>> /* Drain all the messages currently in 'sock''s receive queue. */
>> diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
>> index d789f41..7e01acb 100644
>> --- a/lib/netlink-socket.h
>> +++ b/lib/netlink-socket.h
>> @@ -35,6 +35,7 @@
>> #include <stdbool.h>
>> #include <stddef.h>
>> #include <stdint.h>
>> +#include "list.h"
>>
>> struct ofpbuf;
>> struct nl_sock;
>> @@ -63,6 +64,19 @@ short int nl_sock_woke(const struct nl_sock *);
>>
>> uint32_t nl_sock_pid(const struct nl_sock *);
>>
>> +/* Batching transactions. */
>> +struct nl_transaction {
>> + /* Filled in by client. */
>> + struct ofpbuf *request; /* Request to send. */
>> +
>> + /* Filled in by nl_sock_transact_batch(). */
>> + struct ofpbuf *reply; /* Reply (NULL if reply was an error code).
>> */
>> + int error; /* Positive errno value, 0 if no error. */
>> +};
>> +
>> +void nl_sock_transact_multiple(struct nl_sock *,
>> + struct nl_transaction **, size_t n);
>> +
>> /* Table dumping. */
>> struct nl_dump {
>> struct nl_sock *sock; /* Socket being dumped. */
>> --
>> 1.7.4.4
>>
>> _______________________________________________
>> dev mailing list
>> [email protected]
>> http://openvswitch.org/mailman/listinfo/dev
>>
>
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev