This patch modifies 'struct nl_dump' and nl_dump_next() to allow multiple threads to share the same nl_dump. These changes are targeted around synchronizing buffer status between multiple callers, and allowing callers to fully process their existing buffers before determining whether to stop fetching flows.
The lifecycle of 'dump->status' is as follows:- * nl_dump_start() initializes 'status'. It may be zero or non-zero. * nl_dump_next() reads 'status' and may write a non-zero value. * nl_dump_done() reads 'status' and returns the error code. nl_dump_next() will now attempt to read all messages from the given buffer before fetching more. Multiple threads may call this with the same nl_dump, but must provide independent buffers. When nl_dump_next() encounters an error on the socket (or the final netlink message), it will set the shared error status. This error status will be checked before attempting to fetch additional messages; if non-zero, the current caller should stop dumping flows. In the receive loop, nl_dump_next() will now make non-blocking syscalls, to ensure that it will regularly check the status and break out if there is nothing further to dump. As previously, the final dump status can be determined by calling nl_dump_done() from a single thread. Signed-off-by: Joe Stringer <joestrin...@nicira.com> --- lib/netlink-socket.c | 68 ++++++++++++++++++++++++++++++-------------------- lib/netlink-socket.h | 15 +++++++++-- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c index d2b0973..9ec63ff 100644 --- a/lib/netlink-socket.c +++ b/lib/netlink-socket.c @@ -689,14 +689,16 @@ nl_sock_drain(struct nl_sock *sock) void nl_dump_start(struct nl_dump *dump, int protocol, const struct ofpbuf *request) { - dump->status = nl_pool_alloc(protocol, &dump->sock); - if (dump->status) { + int status = nl_pool_alloc(protocol, &dump->sock); + + if (status) { return; } nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_DUMP | NLM_F_ACK; - dump->status = nl_sock_send__(dump->sock, request, - nl_sock_allocate_seq(dump->sock, 1), true); + status = nl_sock_send__(dump->sock, request, + nl_sock_allocate_seq(dump->sock, 1), true); + atomic_init(&dump->status, status); dump->seq = nl_msg_nlmsghdr(request)->nlmsg_seq; } @@ -707,7 +709,7 @@ nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer) struct nlmsghdr *nlmsghdr; int retval; - retval = nl_sock_recv__(dump->sock, buffer, true); + retval = nl_sock_recv__(dump->sock, buffer, false); if (retval) { return retval == EINTR ? EAGAIN : retval; } @@ -741,25 +743,32 @@ nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer) * to 0. Failure might indicate an actual error or merely the end of replies. * An error status for the entire dump operation is provided when it is * completed by calling nl_dump_done(). + * + * Multiple threads may call this function, passing the same nl_dump, however + * each must provide an independent buffer. This function may cache multiple + * flows in the buffer, and these will be processed before more flows are + * fetched. When this function returns false, other threads may continue to + * process flows in their buffers, but they will not fetch more flows. */ bool nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer) { struct nlmsghdr *nlmsghdr; + int status = 0; reply->data = NULL; reply->size = 0; - if (dump->status) { - return false; - } while (!buffer->size) { - int retval = nl_dump_recv(dump, buffer); - if (retval) { + atomic_read(&dump->status, &status); + if (status) { + return false; + } + status = nl_dump_recv(dump, buffer); + if (status) { ofpbuf_clear(buffer); - if (retval != EAGAIN) { - dump->status = retval; - return false; + if (status != EAGAIN) { + goto end; } } } @@ -767,14 +776,16 @@ nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer) nlmsghdr = nl_msg_next(buffer, reply); if (!nlmsghdr) { VLOG_WARN_RL(&rl, "netlink dump reply contains message fragment"); - dump->status = EPROTO; - return false; + status = EPROTO; } else if (nlmsghdr->nlmsg_type == NLMSG_DONE) { - dump->status = EOF; - return false; + status = EOF; } - return true; +end: + if (status) { + atomic_store(&dump->status, status); + } + return status ? false : true; } /* Completes Netlink dump operation 'dump', which must have been initialized @@ -783,22 +794,25 @@ nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer) int nl_dump_done(struct nl_dump *dump) { - struct ofpbuf buf; + int status; /* Drain any remaining messages that the client didn't read. Otherwise the * kernel will continue to queue them up and waste buffer space. * * XXX We could just destroy and discard the socket in this case. */ - ofpbuf_init(&buf, 4096); - while (!dump->status) { - struct ofpbuf reply; - if (!nl_dump_next(dump, &reply, &buf)) { - ovs_assert(dump->status); - } + atomic_read(&dump->status, &status); + if (!status) { + struct ofpbuf reply, buf; + + ofpbuf_init(&buf, 4096); + while (nl_dump_next(dump, &reply, &buf)); + atomic_read(&dump->status, &status); + ovs_assert(status); + ofpbuf_uninit(&buf); } - ofpbuf_uninit(&buf); + atomic_destroy(&dump->status); nl_pool_release(dump->sock); - return dump->status == EOF ? 0 : dump->status; + return status == EOF ? 0 : status; } /* Causes poll_block() to wake up when any of the specified 'events' (which is diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h index 4e1e588..5cd8713 100644 --- a/lib/netlink-socket.h +++ b/lib/netlink-socket.h @@ -35,13 +35,23 @@ * Thread-safety * ============= * - * Only a single thread may use a given nl_sock or nl_dump at one time. + * Most of the netlink functions are not fully thread-safe: Only a single + * thread may use a given nl_sock or nl_dump at one time. The exceptions are: + * + * - nl_sock_recv() is conditionally thread-safe: it may be called from + * different threads with the same nl_sock, but each caller must provide + * an independent receive buffer. + * + * - nl_dump_next() is conditionally thread-safe: it may be called from + * different threads with the same nl_dump, but each caller must provide + * independent buffers. */ #include <stdbool.h> #include <stddef.h> #include <stdint.h> #include "ofpbuf.h" +#include "ovs-atomic.h" struct nl_sock; @@ -99,7 +109,8 @@ void nl_transact_multiple(int protocol, struct nl_transaction **, size_t n); struct nl_dump { struct nl_sock *sock; /* Socket being dumped. */ uint32_t seq; /* Expected nlmsg_seq for replies. */ - int status; /* 0=OK, EOF=done, or positive errno value. */ + atomic_int status; /* 0=OK, EOF=done, or positive errno value. + * Only set non-zero after initialization. */ }; void nl_dump_start(struct nl_dump *, int protocol, -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev