I think we could take this further by getting rid of the flow put and execute, and having exec_upcalls instead just return the necessary actions and mask. At any rate, this is a good staring point.
In exec_upcalls, we assert that cnt is less that UPCALL_MAX_BATCH which seems wrong. It's much less than the netdev RX batch size, so I'm pretty sure we're going to violate this assertion on occasion. Could we just fill up a batch, execute it, and then fill up another if it's too big? We should probably make UPCALL_MAX_BATCH 64 in that case I don't see why we need dp_netdev_disable_upcall or dp_netdev_purge_userspace_queue, they could just be merged into the functions that call them. The fact that exec_upcalls calls dpif_print_packet() is a bit weird, feels like a layering violation. I'm conflicted though, what do you think? Ethan On Sun, Jul 20, 2014 at 3:37 PM, Ryan Wilson <wr...@nicira.com> wrote: > Typically, kernel datapath threads send upcalls to userspace where > handler threads process the upcalls. For TAP and DPDK devices, the > datapath threads operate in userspace, so there is no need for > separate handler threads. > > This patch allows userspace datapath threads to directly call the > ofproto upcall functions, eliminating the need for handler threads > for datapaths of type 'netdev'. > > Signed-off-by: Ryan Wilson <wr...@nicira.com> > --- > v2: Fix race condition found during perf test > v3: Addressed Daniele's comments > v4: Addressed Ben's style comments, added packet batching > v5: Rebase > v6: Another rebase > v7: Rebase > --- > lib/dpif-linux.c | 3 + > lib/dpif-netdev.c | 395 > +++++++++++++---------------------------- > lib/dpif-netdev.h | 1 + > lib/dpif-provider.h | 16 ++ > lib/dpif.c | 92 +++++++--- > lib/dpif.h | 8 + > ofproto/ofproto-dpif-upcall.c | 239 ++++++++++++++++--------- > ofproto/ofproto-dpif-upcall.h | 5 + > 8 files changed, 378 insertions(+), 381 deletions(-) > > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c > index 4475731..b98413d 100644 > --- a/lib/dpif-linux.c > +++ b/lib/dpif-linux.c > @@ -1934,6 +1934,9 @@ const struct dpif_class dpif_linux_class = { > dpif_linux_recv, > dpif_linux_recv_wait, > dpif_linux_recv_purge, > + NULL, /* register_upcall_cb */ > + NULL, /* enable_upcall */ > + NULL, /* disable_upcall */ > }; > > static int > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 09220b6..7f53111 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) > /* Configuration parameters. */ > enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */ > > -/* Queues. */ > -enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */ > -enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; > -BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN)); > - > /* Protects against changes to 'dp_netdevs'. */ > static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; > > @@ -90,27 +85,18 @@ static struct ovs_mutex dp_netdev_mutex = > OVS_MUTEX_INITIALIZER; > static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex) > = SHASH_INITIALIZER(&dp_netdevs); > > -struct dp_netdev_upcall { > - struct dpif_upcall upcall; /* Queued upcall information. */ > - struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */ > -}; > +enum { MAX_QUEUE_LEN = 50 }; /* Maximum number of packets per userspace > + * queue. */ > > -/* A queue passing packets from a struct dp_netdev to its clients (handlers). > - * > - * > - * Thread-safety > - * ============= > - * > - * Any access at all requires the owning 'dp_netdev''s queue_rwlock and > - * its own mutex. */ > struct dp_netdev_queue { > - struct ovs_mutex mutex; > - struct seq *seq; /* Incremented whenever a packet is queued. */ > - struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; > - unsigned int head OVS_GUARDED; > - unsigned int tail OVS_GUARDED; > + unsigned int packet_count; > + > + struct dpif_upcall upcalls[MAX_QUEUE_LEN]; > + struct ofpbuf bufs[MAX_QUEUE_LEN]; > }; > > +#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 } > + > /* Datapath based on the network device interface from netdev.h. > * > * > @@ -125,11 +111,11 @@ struct dp_netdev_queue { > * dp_netdev_mutex (global) > * port_mutex > * flow_mutex > - * queue_rwlock > */ > struct dp_netdev { > const struct dpif_class *const class; > const char *const name; > + struct dpif *dpif; > struct ovs_refcount ref_cnt; > atomic_flag destroyed; > > @@ -142,15 +128,6 @@ struct dp_netdev { > struct classifier cls; > struct cmap flow_table OVS_GUARDED; /* Flow table. */ > > - /* Queues. > - * > - * 'queue_rwlock' protects the modification of 'handler_queues' and > - * 'n_handlers'. The queue elements are protected by its > - * 'handler_queues''s mutex. */ > - struct fat_rwlock queue_rwlock; > - struct dp_netdev_queue *handler_queues; > - uint32_t n_handlers; > - > /* Statistics. > * > * ovsthread_stats is internally synchronized. */ > @@ -163,6 +140,11 @@ struct dp_netdev { > struct cmap ports; > struct seq *port_seq; /* Incremented whenever a port changes. */ > > + /* Protects access to ofproto-dpif-upcall interface during revalidator > + * thread synchronization. */ > + struct fat_rwlock upcall_rwlock; > + exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. > */ > + > /* Forwarding threads. */ > struct latch exit_latch; > struct pmd_thread *pmd_threads; > @@ -339,14 +321,14 @@ static int do_add_port(struct dp_netdev *dp, const char > *devname, > OVS_REQUIRES(dp->port_mutex); > static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *) > OVS_REQUIRES(dp->port_mutex); > -static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) > - OVS_REQ_WRLOCK(dp->queue_rwlock); > static int dpif_netdev_open(const struct dpif_class *, const char *name, > bool create, struct dpif **); > -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, > - int queue_no, int type, > - const struct miniflow *, > - const struct nlattr *userdata); > +static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *, > + struct ofpbuf *, int type, > + const struct miniflow *, > + const struct nlattr *); > +static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *, > + struct dp_netdev *); > static void dp_netdev_execute_actions(struct dp_netdev *dp, > struct dpif_packet **, int c, > bool may_steal, struct pkt_metadata *, > @@ -357,6 +339,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp, > odp_port_t port_no); > > static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); > +static void dp_netdev_disable_upcall(struct dp_netdev *); > > static struct dpif_netdev * > dpif_netdev_cast(const struct dpif *dpif) > @@ -484,14 +467,17 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > classifier_init(&dp->cls, NULL); > cmap_init(&dp->flow_table); > > - fat_rwlock_init(&dp->queue_rwlock); > - > ovsthread_stats_init(&dp->stats); > > ovs_mutex_init(&dp->port_mutex); > cmap_init(&dp->ports); > dp->port_seq = seq_create(); > latch_init(&dp->exit_latch); > + fat_rwlock_init(&dp->upcall_rwlock); > + > + /* Disable upcalls by default. */ > + dp_netdev_disable_upcall(dp); > + dp->upcall_cb = NULL; > > ovs_mutex_lock(&dp->port_mutex); > error = do_add_port(dp, name, "internal", ODPP_LOCAL); > @@ -523,31 +509,13 @@ dpif_netdev_open(const struct dpif_class *class, const > char *name, > } > if (!error) { > *dpifp = create_dpif_netdev(dp); > + dp->dpif = *dpifp; > } > ovs_mutex_unlock(&dp_netdev_mutex); > > return error; > } > > -static void > -dp_netdev_purge_queues(struct dp_netdev *dp) > - OVS_REQ_WRLOCK(dp->queue_rwlock) > -{ > - int i; > - > - for (i = 0; i < dp->n_handlers; i++) { > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > - > - ovs_mutex_lock(&q->mutex); > - while (q->tail != q->head) { > - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; > - ofpbuf_uninit(&u->upcall.packet); > - ofpbuf_uninit(&u->buf); > - } > - ovs_mutex_unlock(&q->mutex); > - } > -} > - > /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp' > * through the 'dp_netdevs' shash while freeing 'dp'. */ > static void > @@ -576,17 +544,12 @@ dp_netdev_free(struct dp_netdev *dp) > } > ovsthread_stats_destroy(&dp->stats); > > - fat_rwlock_wrlock(&dp->queue_rwlock); > - dp_netdev_destroy_all_queues(dp); > - fat_rwlock_unlock(&dp->queue_rwlock); > - > - fat_rwlock_destroy(&dp->queue_rwlock); > - > classifier_destroy(&dp->cls); > cmap_destroy(&dp->flow_table); > ovs_mutex_destroy(&dp->flow_mutex); > seq_destroy(dp->port_seq); > cmap_destroy(&dp->ports); > + fat_rwlock_destroy(&dp->upcall_rwlock); > latch_destroy(&dp->exit_latch); > free(CONST_CAST(char *, dp->name)); > free(dp); > @@ -1559,80 +1522,6 @@ dpif_netdev_execute(struct dpif *dpif, struct > dpif_execute *execute) > return 0; > } > > -static void > -dp_netdev_destroy_all_queues(struct dp_netdev *dp) > - OVS_REQ_WRLOCK(dp->queue_rwlock) > -{ > - size_t i; > - > - dp_netdev_purge_queues(dp); > - > - for (i = 0; i < dp->n_handlers; i++) { > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > - > - ovs_mutex_destroy(&q->mutex); > - seq_destroy(q->seq); > - } > - free(dp->handler_queues); > - dp->handler_queues = NULL; > - dp->n_handlers = 0; > -} > - > -static void > -dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers) > - OVS_REQ_WRLOCK(dp->queue_rwlock) > -{ > - if (dp->n_handlers != n_handlers) { > - size_t i; > - > - dp_netdev_destroy_all_queues(dp); > - > - dp->n_handlers = n_handlers; > - dp->handler_queues = xzalloc(n_handlers * sizeof > *dp->handler_queues); > - > - for (i = 0; i < n_handlers; i++) { > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > - > - ovs_mutex_init(&q->mutex); > - q->seq = seq_create(); > - } > - } > -} > - > -static int > -dpif_netdev_recv_set(struct dpif *dpif, bool enable) > -{ > - struct dp_netdev *dp = get_dp_netdev(dpif); > - > - if ((dp->handler_queues != NULL) == enable) { > - return 0; > - } > - > - fat_rwlock_wrlock(&dp->queue_rwlock); > - if (!enable) { > - dp_netdev_destroy_all_queues(dp); > - } else { > - dp_netdev_refresh_queues(dp, 1); > - } > - fat_rwlock_unlock(&dp->queue_rwlock); > - > - return 0; > -} > - > -static int > -dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) > -{ > - struct dp_netdev *dp = get_dp_netdev(dpif); > - > - fat_rwlock_wrlock(&dp->queue_rwlock); > - if (dp->handler_queues) { > - dp_netdev_refresh_queues(dp, n_handlers); > - } > - fat_rwlock_unlock(&dp->queue_rwlock); > - > - return 0; > -} > - > static int > dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, > uint32_t queue_id, uint32_t *priority) > @@ -1641,97 +1530,6 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif > OVS_UNUSED, > return 0; > } > > -static bool > -dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id) > - OVS_REQ_RDLOCK(dp->queue_rwlock) > -{ > - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > - > - if (!dp->handler_queues) { > - VLOG_WARN_RL(&rl, "receiving upcall disabled"); > - return false; > - } > - > - if (handler_id >= dp->n_handlers) { > - VLOG_WARN_RL(&rl, "handler index out of bound"); > - return false; > - } > - > - return true; > -} > - > -static int > -dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id, > - struct dpif_upcall *upcall, struct ofpbuf *buf) > -{ > - struct dp_netdev *dp = get_dp_netdev(dpif); > - struct dp_netdev_queue *q; > - int error = 0; > - > - fat_rwlock_rdlock(&dp->queue_rwlock); > - > - if (!dp_netdev_recv_check(dp, handler_id)) { > - error = EAGAIN; > - goto out; > - } > - > - q = &dp->handler_queues[handler_id]; > - ovs_mutex_lock(&q->mutex); > - if (q->head != q->tail) { > - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; > - > - *upcall = u->upcall; > - > - ofpbuf_uninit(buf); > - *buf = u->buf; > - } else { > - error = EAGAIN; > - } > - ovs_mutex_unlock(&q->mutex); > - > -out: > - fat_rwlock_unlock(&dp->queue_rwlock); > - > - return error; > -} > - > -static void > -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) > -{ > - struct dp_netdev *dp = get_dp_netdev(dpif); > - struct dp_netdev_queue *q; > - uint64_t seq; > - > - fat_rwlock_rdlock(&dp->queue_rwlock); > - > - if (!dp_netdev_recv_check(dp, handler_id)) { > - goto out; > - } > - > - q = &dp->handler_queues[handler_id]; > - ovs_mutex_lock(&q->mutex); > - seq = seq_read(q->seq); > - if (q->head != q->tail) { > - poll_immediate_wake(); > - } else { > - seq_wait(q->seq, seq); > - } > - > - ovs_mutex_unlock(&q->mutex); > - > -out: > - fat_rwlock_unlock(&dp->queue_rwlock); > -} > - > -static void > -dpif_netdev_recv_purge(struct dpif *dpif) > -{ > - struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); > - > - fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock); > - dp_netdev_purge_queues(dpif_netdev->dp); > - fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock); > -} > > /* Creates and returns a new 'struct dp_netdev_actions', with a reference > count > * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of > @@ -1919,6 +1717,36 @@ reload: > } > > static void > +dp_netdev_disable_upcall(struct dp_netdev *dp) > + OVS_ACQUIRES(dp->upcall_rwlock) > +{ > + fat_rwlock_wrlock(&dp->upcall_rwlock); > +} > + > +static void > +dpif_netdev_disable_upcall(struct dpif *dpif) > + OVS_NO_THREAD_SAFETY_ANALYSIS > +{ > + struct dp_netdev *dp = get_dp_netdev(dpif); > + dp_netdev_disable_upcall(dp); > +} > + > +static void > +dp_netdev_enable_upcall(struct dp_netdev *dp) > + OVS_RELEASES(dp->upcall_rwlock) > +{ > + fat_rwlock_unlock(&dp->upcall_rwlock); > +} > + > +static void > +dpif_netdev_enable_upcall(struct dpif *dpif) > + OVS_NO_THREAD_SAFETY_ANALYSIS > +{ > + struct dp_netdev *dp = get_dp_netdev(dpif); > + dp_netdev_enable_upcall(dp); > +} > + > +static void > dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) > { > int i; > @@ -2056,6 +1884,7 @@ static void > dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, > struct pkt_metadata *md) > { > + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; > struct packet_batch batches[NETDEV_MAX_RX_BATCH]; > struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH]; > const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. > */ > @@ -2087,17 +1916,11 @@ dp_netdev_input(struct dp_netdev *dp, struct > dpif_packet **packets, int cnt, > } > > if (OVS_UNLIKELY(!rules[i])) { > + struct ofpbuf *buf = &packets[i]->ofpbuf; > > dp_netdev_count_packet(dp, DP_STAT_MISS, 1); > - > - if (OVS_LIKELY(dp->handler_queues)) { > - uint32_t hash = miniflow_hash_5tuple(mfs[i], 0); > - struct ofpbuf *buf = &packets[i]->ofpbuf; > - > - dp_netdev_output_userspace(dp, buf, hash % dp->n_handlers, > - DPIF_UC_MISS, mfs[i], NULL); > - } > - > + dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS, > + mfs[i], NULL); > dpif_packet_delete(packets[i]); > continue; > } > @@ -2127,6 +1950,10 @@ dp_netdev_input(struct dp_netdev *dp, struct > dpif_packet **packets, int cnt, > for (i = 0; i < n_batches; i++) { > packet_batch_execute(&batches[i], dp); > } > + > + if (q.packet_count) { > + dp_netdev_execute_userspace_queue(&q, dp); > + } > } > > static void > @@ -2145,12 +1972,11 @@ dp_netdev_queue_userspace_packet(struct > dp_netdev_queue *q, > struct ofpbuf *packet, int type, > const struct miniflow *key, > const struct nlattr *userdata) > -OVS_REQUIRES(q->mutex) > { > - if (q->head - q->tail < MAX_QUEUE_LEN) { > - struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK]; > - struct dpif_upcall *upcall = &u->upcall; > - struct ofpbuf *buf = &u->buf; > + if (q->packet_count < MAX_QUEUE_LEN) { > + int cnt = q->packet_count; > + struct dpif_upcall *upcall = &q->upcalls[cnt]; > + struct ofpbuf *buf = &q->bufs[cnt]; > size_t buf_size; > struct flow flow; > > @@ -2173,7 +1999,7 @@ OVS_REQUIRES(q->mutex) > /* Put userdata. */ > if (userdata) { > upcall->userdata = ofpbuf_put(buf, userdata, > - NLA_ALIGN(userdata->nla_len)); > + NLA_ALIGN(userdata->nla_len)); > } > > /* We have to perform a copy of the packet, because we cannot send > DPDK > @@ -2183,35 +2009,43 @@ OVS_REQUIRES(q->mutex) > ofpbuf_size(packet))); > ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); > > - seq_change(q->seq); > - > + q->packet_count++; > return 0; > } else { > return ENOBUFS; > } > } > > -static int > -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, > - int queue_no, int type, > - const struct miniflow *key, > - const struct nlattr *userdata) > +static void > +dp_netdev_purge_userspace_queue(struct dp_netdev_queue *q) > { > - struct dp_netdev_queue *q; > - int error; > + struct dpif_upcall *upcalls = q->upcalls; > + struct ofpbuf *bufs = q->bufs; > + int cnt = q->packet_count; > + int i; > > - fat_rwlock_rdlock(&dp->queue_rwlock); > - q = &dp->handler_queues[queue_no]; > - ovs_mutex_lock(&q->mutex); > - error = dp_netdev_queue_userspace_packet(q, packet, type, key, > - userdata); > - if (error == ENOBUFS) { > - dp_netdev_count_packet(dp, DP_STAT_LOST, 1); > + for (i = 0; i < cnt; i++) { > + ofpbuf_uninit(&bufs[i]); > + ofpbuf_uninit(&upcalls[i].packet); > } > - ovs_mutex_unlock(&q->mutex); > - fat_rwlock_unlock(&dp->queue_rwlock); > +} > > - return error; > +static void > +dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q, > + struct dp_netdev *dp) > +{ > + struct dpif_upcall *upcalls = q->upcalls; > + struct ofpbuf *bufs = q->bufs; > + int cnt = q->packet_count; > + > + if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { > + ovs_assert(dp->upcall_cb); > + dp->upcall_cb(dp->dpif, upcalls, bufs, cnt); > + fat_rwlock_unlock(&dp->upcall_rwlock); > + } else { > + dp_netdev_purge_userspace_queue(q); > + } > + q->packet_count = 0; > } > > struct dp_netdev_execute_aux { > @@ -2219,6 +2053,13 @@ struct dp_netdev_execute_aux { > }; > > static void > +dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) > +{ > + struct dp_netdev *dp = get_dp_netdev(dpif); > + dp->upcall_cb = cb; > +} > + > +static void > dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, > struct pkt_metadata *md, > const struct nlattr *a, bool may_steal) > @@ -2245,6 +2086,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, > int cnt, > case OVS_ACTION_ATTR_USERSPACE: { > const struct nlattr *userdata; > struct netdev_flow_key key; > + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; > > userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); > > @@ -2257,15 +2099,17 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > miniflow_extract(packet, md, &key.flow); > > - dp_netdev_output_userspace(aux->dp, packet, > - miniflow_hash_5tuple(&key.flow, 0) > - % aux->dp->n_handlers, > - DPIF_UC_ACTION, &key.flow, > - userdata); > + dp_netdev_queue_userspace_packet(&q, packet, > + DPIF_UC_ACTION, &key.flow, > + userdata); > if (may_steal) { > dpif_packet_delete(packets[i]); > } > } > + > + if (q.packet_count) { > + dp_netdev_execute_userspace_queue(&q, aux->dp); > + } > break; > } > > @@ -2391,12 +2235,15 @@ const struct dpif_class dpif_netdev_class = { > dpif_netdev_flow_dump_next, > dpif_netdev_execute, > NULL, /* operate */ > - dpif_netdev_recv_set, > - dpif_netdev_handlers_set, > + NULL, /* recv_set */ > + NULL, /* handlers_set */ > dpif_netdev_queue_to_priority, > - dpif_netdev_recv, > - dpif_netdev_recv_wait, > - dpif_netdev_recv_purge, > + NULL, /* recv */ > + NULL, /* recv_wait */ > + NULL, /* recv_purge */ > + dpif_netdev_register_upcall_cb, > + dpif_netdev_enable_upcall, > + dpif_netdev_disable_upcall, > }; > > static void > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h > index 0f42d7a..410fcfa 100644 > --- a/lib/dpif-netdev.h > +++ b/lib/dpif-netdev.h > @@ -20,6 +20,7 @@ > #include <stdbool.h> > #include <stddef.h> > #include <stdint.h> > +#include "dpif.h" > #include "openvswitch/types.h" > #include "ofpbuf.h" > #include "packets.h" > diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h > index 6a06cf8..bf24a9d 100644 > --- a/lib/dpif-provider.h > +++ b/lib/dpif-provider.h > @@ -411,6 +411,22 @@ struct dpif_class { > /* Throws away any queued upcalls that 'dpif' currently has ready to > * return. */ > void (*recv_purge)(struct dpif *dpif); > + > + /* For datapaths that run in userspace (i.e. dpif-netdev), threads > polling > + * for incoming packets can directly call upcall functions instead of > + * offloading packet processing to separate handler threads. Datapaths > + * that directly call upcall functions should use the functions below to > + * to register an upcall function and enable / disable upcalls. > + * > + * Registers an upcall callback function with 'dpif'. This is only used > if > + * if 'dpif' directly executes upcall functions. */ > + void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *); > + > + /* Enables upcalls if 'dpif' directly executes upcall functions. */ > + void (*enable_upcall)(struct dpif *); > + > + /* Disables upcalls if 'dpif' directly executes upcall functions. */ > + void (*disable_upcall)(struct dpif *); > }; > > extern const struct dpif_class dpif_linux_class; > diff --git a/lib/dpif.c b/lib/dpif.c > index a325805..3501569 100644 > --- a/lib/dpif.c > +++ b/lib/dpif.c > @@ -1305,8 +1305,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type) > int > dpif_recv_set(struct dpif *dpif, bool enable) > { > - int error = dpif->dpif_class->recv_set(dpif, enable); > - log_operation(dpif, "recv_set", error); > + int error = 0; > + > + if (dpif->dpif_class->recv_set) { > + error = dpif->dpif_class->recv_set(dpif, enable); > + log_operation(dpif, "recv_set", error); > + } > return error; > } > > @@ -1333,11 +1337,61 @@ dpif_recv_set(struct dpif *dpif, bool enable) > int > dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers) > { > - int error = dpif->dpif_class->handlers_set(dpif, n_handlers); > - log_operation(dpif, "handlers_set", error); > + int error = 0; > + > + if (dpif->dpif_class->handlers_set) { > + error = dpif->dpif_class->handlers_set(dpif, n_handlers); > + log_operation(dpif, "handlers_set", error); > + } > return error; > } > > +void > +dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) > +{ > + if (dpif->dpif_class->register_upcall_cb) { > + dpif->dpif_class->register_upcall_cb(dpif, cb); > + } > +} > + > +void > +dpif_enable_upcall(struct dpif *dpif) > +{ > + if (dpif->dpif_class->enable_upcall) { > + dpif->dpif_class->enable_upcall(dpif); > + } > +} > + > +void > +dpif_disable_upcall(struct dpif *dpif) > +{ > + if (dpif->dpif_class->disable_upcall) { > + dpif->dpif_class->disable_upcall(dpif); > + } > +} > + > +void > +dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall) > +{ > + if (!VLOG_DROP_DBG(&dpmsg_rl)) { > + struct ds flow; > + char *packet; > + > + packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), > + ofpbuf_size(&upcall->packet)); > + > + ds_init(&flow); > + odp_flow_key_format(upcall->key, upcall->key_len, &flow); > + > + VLOG_DBG("%s: %s upcall:\n%s\n%s", > + dpif_name(dpif), dpif_upcall_type_to_string(upcall->type), > + ds_cstr(&flow), packet); > + > + ds_destroy(&flow); > + free(packet); > + } > +} > + > /* Polls for an upcall from 'dpif' for an upcall handler. Since there > * there can be multiple poll loops, 'handler_id' is needed as index to > * identify the corresponding poll loop. If successful, stores the upcall > @@ -1360,25 +1414,15 @@ int > dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall, > struct ofpbuf *buf) > { > - int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); > - if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) { > - struct ds flow; > - char *packet; > + int error = EAGAIN; > > - packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), > - ofpbuf_size(&upcall->packet)); > - > - ds_init(&flow); > - odp_flow_key_format(upcall->key, upcall->key_len, &flow); > - > - VLOG_DBG("%s: %s upcall:\n%s\n%s", > - dpif_name(dpif), dpif_upcall_type_to_string(upcall->type), > - ds_cstr(&flow), packet); > - > - ds_destroy(&flow); > - free(packet); > - } else if (error && error != EAGAIN) { > - log_operation(dpif, "recv", error); > + if (dpif->dpif_class->recv) { > + error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); > + if (!error) { > + dpif_print_packet(dpif, upcall); > + } else if (error != EAGAIN) { > + log_operation(dpif, "recv", error); > + } > } > return error; > } > @@ -1401,7 +1445,9 @@ dpif_recv_purge(struct dpif *dpif) > void > dpif_recv_wait(struct dpif *dpif, uint32_t handler_id) > { > - dpif->dpif_class->recv_wait(dpif, handler_id); > + if (dpif->dpif_class->recv_wait) { > + dpif->dpif_class->recv_wait(dpif, handler_id); > + } > } > > /* Obtains the NetFlow engine type and engine ID for 'dpif' into > '*engine_type' > diff --git a/lib/dpif.h b/lib/dpif.h > index 94bcacc..8d8e43a 100644 > --- a/lib/dpif.h > +++ b/lib/dpif.h > @@ -671,12 +671,20 @@ struct dpif_upcall { > struct nlattr *userdata; /* Argument to OVS_ACTION_ATTR_USERSPACE. */ > }; > > +typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *, > + struct ofpbuf *, int cnt); > + > int dpif_recv_set(struct dpif *, bool enable); > int dpif_handlers_set(struct dpif *, uint32_t n_handlers); > int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *, > struct ofpbuf *); > void dpif_recv_purge(struct dpif *); > void dpif_recv_wait(struct dpif *, uint32_t handler_id); > +void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *); > +void dpif_enable_upcall(struct dpif *); > +void dpif_disable_upcall(struct dpif *); > + > +void dpif_print_packet(struct dpif *, struct dpif_upcall *); > > /* Miscellaneous. */ > > diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c > index df33643..51433e3 100644 > --- a/ofproto/ofproto-dpif-upcall.c > +++ b/ofproto/ofproto-dpif-upcall.c > @@ -201,7 +201,9 @@ static struct list all_udpifs = > LIST_INITIALIZER(&all_udpifs); > > static size_t read_upcalls(struct handler *, > struct upcall upcalls[UPCALL_MAX_BATCH]); > -static void handle_upcalls(struct handler *, struct upcall *, size_t > n_upcalls); > +static void free_upcall(struct upcall *); > +static int convert_upcall(struct udpif *, struct upcall *); > +static void handle_upcalls(struct udpif *, struct upcall *, size_t > n_upcalls); > static void udpif_stop_threads(struct udpif *); > static void udpif_start_threads(struct udpif *, size_t n_handlers, > size_t n_revalidators); > @@ -266,6 +268,8 @@ udpif_create(struct dpif_backer *backer, struct dpif > *dpif) > atomic_init(&udpif->n_flows_timestamp, LLONG_MIN); > ovs_mutex_init(&udpif->n_flows_mutex); > > + dpif_register_upcall_cb(dpif, exec_upcalls); > + > return udpif; > } > > @@ -317,6 +321,8 @@ udpif_stop_threads(struct udpif *udpif) > xpthread_join(udpif->revalidators[i].thread, NULL); > } > > + dpif_disable_upcall(udpif->dpif); > + > for (i = 0; i < udpif->n_revalidators; i++) { > struct revalidator *revalidator = &udpif->revalidators[i]; > > @@ -367,6 +373,8 @@ udpif_start_threads(struct udpif *udpif, size_t > n_handlers, > "handler", udpif_upcall_handler, handler); > } > > + dpif_enable_upcall(udpif->dpif); > + > ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators); > udpif->reval_exit = false; > udpif->revalidators = xzalloc(udpif->n_revalidators > @@ -539,12 +547,10 @@ udpif_upcall_handler(void *arg) > latch_wait(&udpif->exit_latch); > poll_block(); > } else { > - handle_upcalls(handler, upcalls, n_upcalls); > + handle_upcalls(handler->udpif, upcalls, n_upcalls); > > for (i = 0; i < n_upcalls; i++) { > - xlate_out_uninit(&upcalls[i].xout); > - ofpbuf_uninit(&upcalls[i].dpif_upcall.packet); > - ofpbuf_uninit(&upcalls[i].upcall_buf); > + free_upcall(&upcalls[i]); > } > } > coverage_clear(); > @@ -751,6 +757,63 @@ upcall_init(struct upcall *upcall, struct flow *flow, > struct ofpbuf *packet, > xlate_actions(&xin, &upcall->xout); > } > > +void > +free_upcall(struct upcall *upcall) > +{ > + xlate_out_uninit(&upcall->xout); > + ofpbuf_uninit(&upcall->dpif_upcall.packet); > + ofpbuf_uninit(&upcall->upcall_buf); > +} > + > +static struct udpif * > +find_udpif(struct dpif *dpif) > +{ > + struct udpif *udpif; > + > + LIST_FOR_EACH (udpif, list_node, &all_udpifs) { > + if (udpif->dpif == dpif) { > + return udpif; > + } > + } > + return NULL; > +} > + > +void > +exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls, > + struct ofpbuf *bufs, int cnt) > +{ > + struct upcall upcalls[UPCALL_MAX_BATCH]; > + size_t n_upcalls = 0; > + struct udpif *udpif; > + int i; > + > + ovs_assert(cnt <= UPCALL_MAX_BATCH); > + > + udpif = find_udpif(dpif); > + ovs_assert(udpif); > + > + for (i = 0; i < cnt; i++) { > + struct upcall *upcall = &upcalls[n_upcalls]; > + struct dpif_upcall *dupcall = &dupcalls[i]; > + struct ofpbuf *buf = &bufs[i]; > + > + upcall->dpif_upcall = *dupcall; > + upcall->upcall_buf = *buf; > + > + dpif_print_packet(dpif, dupcall); > + if (!convert_upcall(udpif, upcall)) { > + n_upcalls += 1; > + } > + } > + > + if (n_upcalls) { > + handle_upcalls(udpif, upcalls, n_upcalls); > + for (i = 0; i < n_upcalls; i++) { > + free_upcall(&upcalls[i]); > + } > + } > +} > + > /* Reads and classifies upcalls. Returns the number of upcalls successfully > * read. */ > static size_t > @@ -764,14 +827,6 @@ read_upcalls(struct handler *handler, > /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */ > for (i = 0; i < UPCALL_MAX_BATCH; i++) { > struct upcall *upcall = &upcalls[n_upcalls]; > - struct dpif_upcall *dupcall; > - struct ofpbuf *packet; > - struct ofproto_dpif *ofproto; > - struct dpif_sflow *sflow; > - struct dpif_ipfix *ipfix; > - struct flow flow; > - enum upcall_type type; > - odp_port_t odp_in_port; > int error; > > ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, > @@ -783,91 +838,107 @@ read_upcalls(struct handler *handler, > break; > } > > - dupcall = &upcall->dpif_upcall; > - packet = &dupcall->packet; > - error = xlate_receive(udpif->backer, packet, dupcall->key, > - dupcall->key_len, &flow, > - &ofproto, &ipfix, &sflow, NULL, &odp_in_port); > - if (error) { > - if (error == ENODEV) { > - /* Received packet on datapath port for which we couldn't > - * associate an ofproto. This can happen if a port is > removed > - * while traffic is being received. Print a rate-limited > - * message in case it happens frequently. Install a drop > flow > - * so that future packets of the flow are inexpensively > dropped > - * in the kernel. */ > - VLOG_INFO_RL(&rl, "received packet on unassociated datapath " > - "port %"PRIu32, odp_in_port); > - dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY, > - dupcall->key, dupcall->key_len, NULL, 0, NULL, > 0, > - NULL); > - } > - goto destroy_upcall; > + if (!convert_upcall(udpif, upcall)) { > + n_upcalls += 1; > } > + } > + return n_upcalls; > +} > > - type = classify_upcall(upcall); > - if (type == MISS_UPCALL) { > - upcall_init(upcall, &flow, packet, ofproto, dupcall, > odp_in_port); > - n_upcalls++; > - continue; > - } > +int > +convert_upcall(struct udpif *udpif, struct upcall *upcall) > +{ > + struct dpif_upcall *dupcall = &upcall->dpif_upcall; > + struct ofpbuf *packet = &dupcall->packet; > + struct ofproto_dpif *ofproto; > + struct dpif_sflow *sflow; > + struct dpif_ipfix *ipfix; > + struct flow flow; > + enum upcall_type type; > + odp_port_t odp_in_port; > + int error; > > - switch (type) { > - case SFLOW_UPCALL: > - if (sflow) { > - union user_action_cookie cookie; > + error = xlate_receive(udpif->backer, packet, dupcall->key, > + dupcall->key_len, &flow, > + &ofproto, &ipfix, &sflow, NULL, &odp_in_port); > > - memset(&cookie, 0, sizeof cookie); > - memcpy(&cookie, nl_attr_get(dupcall->userdata), > - sizeof cookie.sflow); > - dpif_sflow_received(sflow, packet, &flow, odp_in_port, > - &cookie); > - } > - break; > - case IPFIX_UPCALL: > - if (ipfix) { > - dpif_ipfix_bridge_sample(ipfix, packet, &flow); > - } > - break; > - case FLOW_SAMPLE_UPCALL: > - if (ipfix) { > - union user_action_cookie cookie; > - > - memset(&cookie, 0, sizeof cookie); > - memcpy(&cookie, nl_attr_get(dupcall->userdata), > - sizeof cookie.flow_sample); > - > - /* The flow reflects exactly the contents of the packet. > - * Sample the packet using it. */ > - dpif_ipfix_flow_sample(ipfix, packet, &flow, > - cookie.flow_sample.collector_set_id, > - cookie.flow_sample.probability, > - cookie.flow_sample.obs_domain_id, > - cookie.flow_sample.obs_point_id); > - } > - break; > - case BAD_UPCALL: > - break; > - case MISS_UPCALL: > - OVS_NOT_REACHED(); > + if (error) { > + if (error == ENODEV) { > + /* Received packet on datapath port for which we couldn't > + * associate an ofproto. This can happen if a port is removed > + * while traffic is being received. Print a rate-limited > + * message in case it happens frequently. Install a drop flow > + * so that future packets of the flow are inexpensively dropped > + * in the kernel. */ > + VLOG_INFO_RL(&rl, "received packet on unassociated datapath " > + "port %"PRIu32, odp_in_port); > + dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY, > + dupcall->key, dupcall->key_len, NULL, 0, NULL, 0, > + NULL); > } > + goto destroy_upcall; > + } > > - dpif_ipfix_unref(ipfix); > - dpif_sflow_unref(sflow); > + type = classify_upcall(upcall); > + if (type == MISS_UPCALL) { > + upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port); > + return error; > + } > > -destroy_upcall: > - ofpbuf_uninit(&upcall->dpif_upcall.packet); > - ofpbuf_uninit(&upcall->upcall_buf); > + switch (type) { > + case SFLOW_UPCALL: > + if (sflow) { > + union user_action_cookie cookie; > + > + memset(&cookie, 0, sizeof cookie); > + memcpy(&cookie, nl_attr_get(dupcall->userdata), > + sizeof cookie.sflow); > + dpif_sflow_received(sflow, packet, &flow, odp_in_port, > + &cookie); > + } > + break; > + case IPFIX_UPCALL: > + if (ipfix) { > + dpif_ipfix_bridge_sample(ipfix, packet, &flow); > + } > + break; > + case FLOW_SAMPLE_UPCALL: > + if (ipfix) { > + union user_action_cookie cookie; > + > + memset(&cookie, 0, sizeof cookie); > + memcpy(&cookie, nl_attr_get(dupcall->userdata), > + sizeof cookie.flow_sample); > + > + /* The flow reflects exactly the contents of the packet. > + * Sample the packet using it. */ > + dpif_ipfix_flow_sample(ipfix, packet, &flow, > + cookie.flow_sample.collector_set_id, > + cookie.flow_sample.probability, > + cookie.flow_sample.obs_domain_id, > + cookie.flow_sample.obs_point_id); > + } > + break; > + case BAD_UPCALL: > + break; > + case MISS_UPCALL: > + OVS_NOT_REACHED(); > } > > - return n_upcalls; > + dpif_ipfix_unref(ipfix); > + dpif_sflow_unref(sflow); > + error = EAGAIN; > + > +destroy_upcall: > + ofpbuf_uninit(&upcall->dpif_upcall.packet); > + ofpbuf_uninit(&upcall->upcall_buf); > + return error; > } > > static void > -handle_upcalls(struct handler *handler, struct upcall *upcalls, > +handle_upcalls(struct udpif *udpif, struct upcall *upcalls, > size_t n_upcalls) > { > - struct udpif *udpif = handler->udpif; > struct dpif_op *opsp[UPCALL_MAX_BATCH * 2]; > struct dpif_op ops[UPCALL_MAX_BATCH * 2]; > size_t n_ops, i; > diff --git a/ofproto/ofproto-dpif-upcall.h b/ofproto/ofproto-dpif-upcall.h > index 8c4b655..2b197ad 100644 > --- a/ofproto/ofproto-dpif-upcall.h > +++ b/ofproto/ofproto-dpif-upcall.h > @@ -19,6 +19,8 @@ > > struct dpif; > struct dpif_backer; > +struct dpif_upcall; > +struct ofpbuf; > struct seq; > struct simap; > > @@ -26,6 +28,9 @@ struct simap; > * them. Additionally, it's responsible for maintaining the datapath flow > * table. */ > > +void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *, > + int cnt); > + > struct udpif *udpif_create(struct dpif_backer *, struct dpif *); > void udpif_run(struct udpif *udpif); > void udpif_set_threads(struct udpif *, size_t n_handlers, > -- > 1.7.9.5 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev