On Mon, Jul 21, 2014 at 2:11 PM, Ethan Jackson <et...@nicira.com> wrote:
> I think we could take this further by getting rid of the flow put and > execute, and having exec_upcalls instead just return the necessary > actions and mask. At any rate, this is a good staring point. > > In exec_upcalls, we assert that cnt is less that UPCALL_MAX_BATCH > which seems wrong. It's much less than the netdev RX batch size, so > I'm pretty sure we're going to violate this assertion on occasion. > Could we just fill up a batch, execute it, and then fill up another if > it's too big? We should probably make UPCALL_MAX_BATCH 64 in that > case > > Fixed this. > I don't see why we need dp_netdev_disable_upcall or > dp_netdev_purge_userspace_queue, they could just be merged into the > functions that call them. > > dp_netdev_disable_upcall is added, so create_dp_netdev() does not need OVS_NO_THREAD_SAFETY_ANALYSIS. This is because it creates and locks dp->upcall_mutex, but create_dp_netdev() cannot have a OVS_ACQUIRES annotation since it has not been created yet. I'll fix dp_netdev_purge_userspace_queue() though. > The fact that exec_upcalls calls dpif_print_packet() is a bit weird, > feels like a layering violation. I'm conflicted though, what do you > think? > > Not sure why this would be since we call dpif_recv in ofproto-upcall. > Ethan > > On Sun, Jul 20, 2014 at 3:37 PM, Ryan Wilson <wr...@nicira.com> wrote: > > Typically, kernel datapath threads send upcalls to userspace where > > handler threads process the upcalls. For TAP and DPDK devices, the > > datapath threads operate in userspace, so there is no need for > > separate handler threads. > > > > This patch allows userspace datapath threads to directly call the > > ofproto upcall functions, eliminating the need for handler threads > > for datapaths of type 'netdev'. > > > > Signed-off-by: Ryan Wilson <wr...@nicira.com> > > --- > > v2: Fix race condition found during perf test > > v3: Addressed Daniele's comments > > v4: Addressed Ben's style comments, added packet batching > > v5: Rebase > > v6: Another rebase > > v7: Rebase > > --- > > lib/dpif-linux.c | 3 + > > lib/dpif-netdev.c | 395 > +++++++++++++---------------------------- > > lib/dpif-netdev.h | 1 + > > lib/dpif-provider.h | 16 ++ > > lib/dpif.c | 92 +++++++--- > > lib/dpif.h | 8 + > > ofproto/ofproto-dpif-upcall.c | 239 ++++++++++++++++--------- > > ofproto/ofproto-dpif-upcall.h | 5 + > > 8 files changed, 378 insertions(+), 381 deletions(-) > > > > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c > > index 4475731..b98413d 100644 > > --- a/lib/dpif-linux.c > > +++ b/lib/dpif-linux.c > > @@ -1934,6 +1934,9 @@ const struct dpif_class dpif_linux_class = { > > dpif_linux_recv, > > dpif_linux_recv_wait, > > dpif_linux_recv_purge, > > + NULL, /* register_upcall_cb */ > > + NULL, /* enable_upcall */ > > + NULL, /* disable_upcall */ > > }; > > > > static int > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > > index 09220b6..7f53111 100644 > > --- a/lib/dpif-netdev.c > > +++ b/lib/dpif-netdev.c > > @@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, > 0) > > /* Configuration parameters. */ > > enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow > table. */ > > > > -/* Queues. */ > > -enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. > */ > > -enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; > > -BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN)); > > - > > /* Protects against changes to 'dp_netdevs'. */ > > static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; > > > > @@ -90,27 +85,18 @@ static struct ovs_mutex dp_netdev_mutex = > OVS_MUTEX_INITIALIZER; > > static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex) > > = SHASH_INITIALIZER(&dp_netdevs); > > > > -struct dp_netdev_upcall { > > - struct dpif_upcall upcall; /* Queued upcall information. */ > > - struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */ > > -}; > > +enum { MAX_QUEUE_LEN = 50 }; /* Maximum number of packets per > userspace > > + * queue. */ > > > > -/* A queue passing packets from a struct dp_netdev to its clients > (handlers). > > - * > > - * > > - * Thread-safety > > - * ============= > > - * > > - * Any access at all requires the owning 'dp_netdev''s queue_rwlock and > > - * its own mutex. */ > > struct dp_netdev_queue { > > - struct ovs_mutex mutex; > > - struct seq *seq; /* Incremented whenever a packet is queued. */ > > - struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; > > - unsigned int head OVS_GUARDED; > > - unsigned int tail OVS_GUARDED; > > + unsigned int packet_count; > > + > > + struct dpif_upcall upcalls[MAX_QUEUE_LEN]; > > + struct ofpbuf bufs[MAX_QUEUE_LEN]; > > }; > > > > +#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 } > > + > > /* Datapath based on the network device interface from netdev.h. > > * > > * > > @@ -125,11 +111,11 @@ struct dp_netdev_queue { > > * dp_netdev_mutex (global) > > * port_mutex > > * flow_mutex > > - * queue_rwlock > > */ > > struct dp_netdev { > > const struct dpif_class *const class; > > const char *const name; > > + struct dpif *dpif; > > struct ovs_refcount ref_cnt; > > atomic_flag destroyed; > > > > @@ -142,15 +128,6 @@ struct dp_netdev { > > struct classifier cls; > > struct cmap flow_table OVS_GUARDED; /* Flow table. */ > > > > - /* Queues. > > - * > > - * 'queue_rwlock' protects the modification of 'handler_queues' and > > - * 'n_handlers'. The queue elements are protected by its > > - * 'handler_queues''s mutex. */ > > - struct fat_rwlock queue_rwlock; > > - struct dp_netdev_queue *handler_queues; > > - uint32_t n_handlers; > > - > > /* Statistics. > > * > > * ovsthread_stats is internally synchronized. */ > > @@ -163,6 +140,11 @@ struct dp_netdev { > > struct cmap ports; > > struct seq *port_seq; /* Incremented whenever a port changes. > */ > > > > + /* Protects access to ofproto-dpif-upcall interface during > revalidator > > + * thread synchronization. */ > > + struct fat_rwlock upcall_rwlock; > > + exec_upcall_cb *upcall_cb; /* Callback function for executing > upcalls. */ > > + > > /* Forwarding threads. */ > > struct latch exit_latch; > > struct pmd_thread *pmd_threads; > > @@ -339,14 +321,14 @@ static int do_add_port(struct dp_netdev *dp, const > char *devname, > > OVS_REQUIRES(dp->port_mutex); > > static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *) > > OVS_REQUIRES(dp->port_mutex); > > -static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) > > - OVS_REQ_WRLOCK(dp->queue_rwlock); > > static int dpif_netdev_open(const struct dpif_class *, const char *name, > > bool create, struct dpif **); > > -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct > ofpbuf *, > > - int queue_no, int type, > > - const struct miniflow *, > > - const struct nlattr *userdata); > > +static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *, > > + struct ofpbuf *, int type, > > + const struct miniflow *, > > + const struct nlattr *); > > +static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *, > > + struct dp_netdev *); > > static void dp_netdev_execute_actions(struct dp_netdev *dp, > > struct dpif_packet **, int c, > > bool may_steal, struct > pkt_metadata *, > > @@ -357,6 +339,7 @@ static void dp_netdev_port_input(struct dp_netdev > *dp, > > odp_port_t port_no); > > > > static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); > > +static void dp_netdev_disable_upcall(struct dp_netdev *); > > > > static struct dpif_netdev * > > dpif_netdev_cast(const struct dpif *dpif) > > @@ -484,14 +467,17 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > classifier_init(&dp->cls, NULL); > > cmap_init(&dp->flow_table); > > > > - fat_rwlock_init(&dp->queue_rwlock); > > - > > ovsthread_stats_init(&dp->stats); > > > > ovs_mutex_init(&dp->port_mutex); > > cmap_init(&dp->ports); > > dp->port_seq = seq_create(); > > latch_init(&dp->exit_latch); > > + fat_rwlock_init(&dp->upcall_rwlock); > > + > > + /* Disable upcalls by default. */ > > + dp_netdev_disable_upcall(dp); > > + dp->upcall_cb = NULL; > > > > ovs_mutex_lock(&dp->port_mutex); > > error = do_add_port(dp, name, "internal", ODPP_LOCAL); > > @@ -523,31 +509,13 @@ dpif_netdev_open(const struct dpif_class *class, > const char *name, > > } > > if (!error) { > > *dpifp = create_dpif_netdev(dp); > > + dp->dpif = *dpifp; > > } > > ovs_mutex_unlock(&dp_netdev_mutex); > > > > return error; > > } > > > > -static void > > -dp_netdev_purge_queues(struct dp_netdev *dp) > > - OVS_REQ_WRLOCK(dp->queue_rwlock) > > -{ > > - int i; > > - > > - for (i = 0; i < dp->n_handlers; i++) { > > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > > - > > - ovs_mutex_lock(&q->mutex); > > - while (q->tail != q->head) { > > - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & > QUEUE_MASK]; > > - ofpbuf_uninit(&u->upcall.packet); > > - ofpbuf_uninit(&u->buf); > > - } > > - ovs_mutex_unlock(&q->mutex); > > - } > > -} > > - > > /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp' > > * through the 'dp_netdevs' shash while freeing 'dp'. */ > > static void > > @@ -576,17 +544,12 @@ dp_netdev_free(struct dp_netdev *dp) > > } > > ovsthread_stats_destroy(&dp->stats); > > > > - fat_rwlock_wrlock(&dp->queue_rwlock); > > - dp_netdev_destroy_all_queues(dp); > > - fat_rwlock_unlock(&dp->queue_rwlock); > > - > > - fat_rwlock_destroy(&dp->queue_rwlock); > > - > > classifier_destroy(&dp->cls); > > cmap_destroy(&dp->flow_table); > > ovs_mutex_destroy(&dp->flow_mutex); > > seq_destroy(dp->port_seq); > > cmap_destroy(&dp->ports); > > + fat_rwlock_destroy(&dp->upcall_rwlock); > > latch_destroy(&dp->exit_latch); > > free(CONST_CAST(char *, dp->name)); > > free(dp); > > @@ -1559,80 +1522,6 @@ dpif_netdev_execute(struct dpif *dpif, struct > dpif_execute *execute) > > return 0; > > } > > > > -static void > > -dp_netdev_destroy_all_queues(struct dp_netdev *dp) > > - OVS_REQ_WRLOCK(dp->queue_rwlock) > > -{ > > - size_t i; > > - > > - dp_netdev_purge_queues(dp); > > - > > - for (i = 0; i < dp->n_handlers; i++) { > > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > > - > > - ovs_mutex_destroy(&q->mutex); > > - seq_destroy(q->seq); > > - } > > - free(dp->handler_queues); > > - dp->handler_queues = NULL; > > - dp->n_handlers = 0; > > -} > > - > > -static void > > -dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers) > > - OVS_REQ_WRLOCK(dp->queue_rwlock) > > -{ > > - if (dp->n_handlers != n_handlers) { > > - size_t i; > > - > > - dp_netdev_destroy_all_queues(dp); > > - > > - dp->n_handlers = n_handlers; > > - dp->handler_queues = xzalloc(n_handlers * sizeof > *dp->handler_queues); > > - > > - for (i = 0; i < n_handlers; i++) { > > - struct dp_netdev_queue *q = &dp->handler_queues[i]; > > - > > - ovs_mutex_init(&q->mutex); > > - q->seq = seq_create(); > > - } > > - } > > -} > > - > > -static int > > -dpif_netdev_recv_set(struct dpif *dpif, bool enable) > > -{ > > - struct dp_netdev *dp = get_dp_netdev(dpif); > > - > > - if ((dp->handler_queues != NULL) == enable) { > > - return 0; > > - } > > - > > - fat_rwlock_wrlock(&dp->queue_rwlock); > > - if (!enable) { > > - dp_netdev_destroy_all_queues(dp); > > - } else { > > - dp_netdev_refresh_queues(dp, 1); > > - } > > - fat_rwlock_unlock(&dp->queue_rwlock); > > - > > - return 0; > > -} > > - > > -static int > > -dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) > > -{ > > - struct dp_netdev *dp = get_dp_netdev(dpif); > > - > > - fat_rwlock_wrlock(&dp->queue_rwlock); > > - if (dp->handler_queues) { > > - dp_netdev_refresh_queues(dp, n_handlers); > > - } > > - fat_rwlock_unlock(&dp->queue_rwlock); > > - > > - return 0; > > -} > > - > > static int > > dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, > > uint32_t queue_id, uint32_t *priority) > > @@ -1641,97 +1530,6 @@ dpif_netdev_queue_to_priority(const struct dpif > *dpif OVS_UNUSED, > > return 0; > > } > > > > -static bool > > -dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t > handler_id) > > - OVS_REQ_RDLOCK(dp->queue_rwlock) > > -{ > > - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > > - > > - if (!dp->handler_queues) { > > - VLOG_WARN_RL(&rl, "receiving upcall disabled"); > > - return false; > > - } > > - > > - if (handler_id >= dp->n_handlers) { > > - VLOG_WARN_RL(&rl, "handler index out of bound"); > > - return false; > > - } > > - > > - return true; > > -} > > - > > -static int > > -dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id, > > - struct dpif_upcall *upcall, struct ofpbuf *buf) > > -{ > > - struct dp_netdev *dp = get_dp_netdev(dpif); > > - struct dp_netdev_queue *q; > > - int error = 0; > > - > > - fat_rwlock_rdlock(&dp->queue_rwlock); > > - > > - if (!dp_netdev_recv_check(dp, handler_id)) { > > - error = EAGAIN; > > - goto out; > > - } > > - > > - q = &dp->handler_queues[handler_id]; > > - ovs_mutex_lock(&q->mutex); > > - if (q->head != q->tail) { > > - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & > QUEUE_MASK]; > > - > > - *upcall = u->upcall; > > - > > - ofpbuf_uninit(buf); > > - *buf = u->buf; > > - } else { > > - error = EAGAIN; > > - } > > - ovs_mutex_unlock(&q->mutex); > > - > > -out: > > - fat_rwlock_unlock(&dp->queue_rwlock); > > - > > - return error; > > -} > > - > > -static void > > -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) > > -{ > > - struct dp_netdev *dp = get_dp_netdev(dpif); > > - struct dp_netdev_queue *q; > > - uint64_t seq; > > - > > - fat_rwlock_rdlock(&dp->queue_rwlock); > > - > > - if (!dp_netdev_recv_check(dp, handler_id)) { > > - goto out; > > - } > > - > > - q = &dp->handler_queues[handler_id]; > > - ovs_mutex_lock(&q->mutex); > > - seq = seq_read(q->seq); > > - if (q->head != q->tail) { > > - poll_immediate_wake(); > > - } else { > > - seq_wait(q->seq, seq); > > - } > > - > > - ovs_mutex_unlock(&q->mutex); > > - > > -out: > > - fat_rwlock_unlock(&dp->queue_rwlock); > > -} > > - > > -static void > > -dpif_netdev_recv_purge(struct dpif *dpif) > > -{ > > - struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); > > - > > - fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock); > > - dp_netdev_purge_queues(dpif_netdev->dp); > > - fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock); > > -} > > > > /* Creates and returns a new 'struct dp_netdev_actions', with a > reference count > > * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of > > @@ -1919,6 +1717,36 @@ reload: > > } > > > > static void > > +dp_netdev_disable_upcall(struct dp_netdev *dp) > > + OVS_ACQUIRES(dp->upcall_rwlock) > > +{ > > + fat_rwlock_wrlock(&dp->upcall_rwlock); > > +} > > + > > +static void > > +dpif_netdev_disable_upcall(struct dpif *dpif) > > + OVS_NO_THREAD_SAFETY_ANALYSIS > > +{ > > + struct dp_netdev *dp = get_dp_netdev(dpif); > > + dp_netdev_disable_upcall(dp); > > +} > > + > > +static void > > +dp_netdev_enable_upcall(struct dp_netdev *dp) > > + OVS_RELEASES(dp->upcall_rwlock) > > +{ > > + fat_rwlock_unlock(&dp->upcall_rwlock); > > +} > > + > > +static void > > +dpif_netdev_enable_upcall(struct dpif *dpif) > > + OVS_NO_THREAD_SAFETY_ANALYSIS > > +{ > > + struct dp_netdev *dp = get_dp_netdev(dpif); > > + dp_netdev_enable_upcall(dp); > > +} > > + > > +static void > > dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) > > { > > int i; > > @@ -2056,6 +1884,7 @@ static void > > dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int > cnt, > > struct pkt_metadata *md) > > { > > + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; > > struct packet_batch batches[NETDEV_MAX_RX_BATCH]; > > struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH]; > > const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad > packets. */ > > @@ -2087,17 +1916,11 @@ dp_netdev_input(struct dp_netdev *dp, struct > dpif_packet **packets, int cnt, > > } > > > > if (OVS_UNLIKELY(!rules[i])) { > > + struct ofpbuf *buf = &packets[i]->ofpbuf; > > > > dp_netdev_count_packet(dp, DP_STAT_MISS, 1); > > - > > - if (OVS_LIKELY(dp->handler_queues)) { > > - uint32_t hash = miniflow_hash_5tuple(mfs[i], 0); > > - struct ofpbuf *buf = &packets[i]->ofpbuf; > > - > > - dp_netdev_output_userspace(dp, buf, hash % > dp->n_handlers, > > - DPIF_UC_MISS, mfs[i], NULL); > > - } > > - > > + dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS, > > + mfs[i], NULL); > > dpif_packet_delete(packets[i]); > > continue; > > } > > @@ -2127,6 +1950,10 @@ dp_netdev_input(struct dp_netdev *dp, struct > dpif_packet **packets, int cnt, > > for (i = 0; i < n_batches; i++) { > > packet_batch_execute(&batches[i], dp); > > } > > + > > + if (q.packet_count) { > > + dp_netdev_execute_userspace_queue(&q, dp); > > + } > > } > > > > static void > > @@ -2145,12 +1972,11 @@ dp_netdev_queue_userspace_packet(struct > dp_netdev_queue *q, > > struct ofpbuf *packet, int type, > > const struct miniflow *key, > > const struct nlattr *userdata) > > -OVS_REQUIRES(q->mutex) > > { > > - if (q->head - q->tail < MAX_QUEUE_LEN) { > > - struct dp_netdev_upcall *u = &q->upcalls[q->head++ & > QUEUE_MASK]; > > - struct dpif_upcall *upcall = &u->upcall; > > - struct ofpbuf *buf = &u->buf; > > + if (q->packet_count < MAX_QUEUE_LEN) { > > + int cnt = q->packet_count; > > + struct dpif_upcall *upcall = &q->upcalls[cnt]; > > + struct ofpbuf *buf = &q->bufs[cnt]; > > size_t buf_size; > > struct flow flow; > > > > @@ -2173,7 +1999,7 @@ OVS_REQUIRES(q->mutex) > > /* Put userdata. */ > > if (userdata) { > > upcall->userdata = ofpbuf_put(buf, userdata, > > - NLA_ALIGN(userdata->nla_len)); > > + NLA_ALIGN(userdata->nla_len)); > > } > > > > /* We have to perform a copy of the packet, because we cannot > send DPDK > > @@ -2183,35 +2009,43 @@ OVS_REQUIRES(q->mutex) > > ofpbuf_size(packet))); > > ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); > > > > - seq_change(q->seq); > > - > > + q->packet_count++; > > return 0; > > } else { > > return ENOBUFS; > > } > > } > > > > -static int > > -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, > > - int queue_no, int type, > > - const struct miniflow *key, > > - const struct nlattr *userdata) > > +static void > > +dp_netdev_purge_userspace_queue(struct dp_netdev_queue *q) > > { > > - struct dp_netdev_queue *q; > > - int error; > > + struct dpif_upcall *upcalls = q->upcalls; > > + struct ofpbuf *bufs = q->bufs; > > + int cnt = q->packet_count; > > + int i; > > > > - fat_rwlock_rdlock(&dp->queue_rwlock); > > - q = &dp->handler_queues[queue_no]; > > - ovs_mutex_lock(&q->mutex); > > - error = dp_netdev_queue_userspace_packet(q, packet, type, key, > > - userdata); > > - if (error == ENOBUFS) { > > - dp_netdev_count_packet(dp, DP_STAT_LOST, 1); > > + for (i = 0; i < cnt; i++) { > > + ofpbuf_uninit(&bufs[i]); > > + ofpbuf_uninit(&upcalls[i].packet); > > } > > - ovs_mutex_unlock(&q->mutex); > > - fat_rwlock_unlock(&dp->queue_rwlock); > > +} > > > > - return error; > > +static void > > +dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q, > > + struct dp_netdev *dp) > > +{ > > + struct dpif_upcall *upcalls = q->upcalls; > > + struct ofpbuf *bufs = q->bufs; > > + int cnt = q->packet_count; > > + > > + if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { > > + ovs_assert(dp->upcall_cb); > > + dp->upcall_cb(dp->dpif, upcalls, bufs, cnt); > > + fat_rwlock_unlock(&dp->upcall_rwlock); > > + } else { > > + dp_netdev_purge_userspace_queue(q); > > + } > > + q->packet_count = 0; > > } > > > > struct dp_netdev_execute_aux { > > @@ -2219,6 +2053,13 @@ struct dp_netdev_execute_aux { > > }; > > > > static void > > +dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) > > +{ > > + struct dp_netdev *dp = get_dp_netdev(dpif); > > + dp->upcall_cb = cb; > > +} > > + > > +static void > > dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, > > struct pkt_metadata *md, > > const struct nlattr *a, bool may_steal) > > @@ -2245,6 +2086,7 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > case OVS_ACTION_ATTR_USERSPACE: { > > const struct nlattr *userdata; > > struct netdev_flow_key key; > > + struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER; > > > > userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); > > > > @@ -2257,15 +2099,17 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > > > miniflow_extract(packet, md, &key.flow); > > > > - dp_netdev_output_userspace(aux->dp, packet, > > - miniflow_hash_5tuple(&key.flow, > 0) > > - % aux->dp->n_handlers, > > - DPIF_UC_ACTION, &key.flow, > > - userdata); > > + dp_netdev_queue_userspace_packet(&q, packet, > > + DPIF_UC_ACTION, &key.flow, > > + userdata); > > if (may_steal) { > > dpif_packet_delete(packets[i]); > > } > > } > > + > > + if (q.packet_count) { > > + dp_netdev_execute_userspace_queue(&q, aux->dp); > > + } > > break; > > } > > > > @@ -2391,12 +2235,15 @@ const struct dpif_class dpif_netdev_class = { > > dpif_netdev_flow_dump_next, > > dpif_netdev_execute, > > NULL, /* operate */ > > - dpif_netdev_recv_set, > > - dpif_netdev_handlers_set, > > + NULL, /* recv_set */ > > + NULL, /* handlers_set */ > > dpif_netdev_queue_to_priority, > > - dpif_netdev_recv, > > - dpif_netdev_recv_wait, > > - dpif_netdev_recv_purge, > > + NULL, /* recv */ > > + NULL, /* recv_wait */ > > + NULL, /* recv_purge */ > > + dpif_netdev_register_upcall_cb, > > + dpif_netdev_enable_upcall, > > + dpif_netdev_disable_upcall, > > }; > > > > static void > > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h > > index 0f42d7a..410fcfa 100644 > > --- a/lib/dpif-netdev.h > > +++ b/lib/dpif-netdev.h > > @@ -20,6 +20,7 @@ > > #include <stdbool.h> > > #include <stddef.h> > > #include <stdint.h> > > +#include "dpif.h" > > #include "openvswitch/types.h" > > #include "ofpbuf.h" > > #include "packets.h" > > diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h > > index 6a06cf8..bf24a9d 100644 > > --- a/lib/dpif-provider.h > > +++ b/lib/dpif-provider.h > > @@ -411,6 +411,22 @@ struct dpif_class { > > /* Throws away any queued upcalls that 'dpif' currently has ready to > > * return. */ > > void (*recv_purge)(struct dpif *dpif); > > + > > + /* For datapaths that run in userspace (i.e. dpif-netdev), threads > polling > > + * for incoming packets can directly call upcall functions instead > of > > + * offloading packet processing to separate handler threads. > Datapaths > > + * that directly call upcall functions should use the functions > below to > > + * to register an upcall function and enable / disable upcalls. > > + * > > + * Registers an upcall callback function with 'dpif'. This is only > used if > > + * if 'dpif' directly executes upcall functions. */ > > + void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *); > > + > > + /* Enables upcalls if 'dpif' directly executes upcall functions. */ > > + void (*enable_upcall)(struct dpif *); > > + > > + /* Disables upcalls if 'dpif' directly executes upcall functions. */ > > + void (*disable_upcall)(struct dpif *); > > }; > > > > extern const struct dpif_class dpif_linux_class; > > diff --git a/lib/dpif.c b/lib/dpif.c > > index a325805..3501569 100644 > > --- a/lib/dpif.c > > +++ b/lib/dpif.c > > @@ -1305,8 +1305,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type > type) > > int > > dpif_recv_set(struct dpif *dpif, bool enable) > > { > > - int error = dpif->dpif_class->recv_set(dpif, enable); > > - log_operation(dpif, "recv_set", error); > > + int error = 0; > > + > > + if (dpif->dpif_class->recv_set) { > > + error = dpif->dpif_class->recv_set(dpif, enable); > > + log_operation(dpif, "recv_set", error); > > + } > > return error; > > } > > > > @@ -1333,11 +1337,61 @@ dpif_recv_set(struct dpif *dpif, bool enable) > > int > > dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers) > > { > > - int error = dpif->dpif_class->handlers_set(dpif, n_handlers); > > - log_operation(dpif, "handlers_set", error); > > + int error = 0; > > + > > + if (dpif->dpif_class->handlers_set) { > > + error = dpif->dpif_class->handlers_set(dpif, n_handlers); > > + log_operation(dpif, "handlers_set", error); > > + } > > return error; > > } > > > > +void > > +dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb) > > +{ > > + if (dpif->dpif_class->register_upcall_cb) { > > + dpif->dpif_class->register_upcall_cb(dpif, cb); > > + } > > +} > > + > > +void > > +dpif_enable_upcall(struct dpif *dpif) > > +{ > > + if (dpif->dpif_class->enable_upcall) { > > + dpif->dpif_class->enable_upcall(dpif); > > + } > > +} > > + > > +void > > +dpif_disable_upcall(struct dpif *dpif) > > +{ > > + if (dpif->dpif_class->disable_upcall) { > > + dpif->dpif_class->disable_upcall(dpif); > > + } > > +} > > + > > +void > > +dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall) > > +{ > > + if (!VLOG_DROP_DBG(&dpmsg_rl)) { > > + struct ds flow; > > + char *packet; > > + > > + packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), > > + ofpbuf_size(&upcall->packet)); > > + > > + ds_init(&flow); > > + odp_flow_key_format(upcall->key, upcall->key_len, &flow); > > + > > + VLOG_DBG("%s: %s upcall:\n%s\n%s", > > + dpif_name(dpif), > dpif_upcall_type_to_string(upcall->type), > > + ds_cstr(&flow), packet); > > + > > + ds_destroy(&flow); > > + free(packet); > > + } > > +} > > + > > /* Polls for an upcall from 'dpif' for an upcall handler. Since there > > * there can be multiple poll loops, 'handler_id' is needed as index to > > * identify the corresponding poll loop. If successful, stores the > upcall > > @@ -1360,25 +1414,15 @@ int > > dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall > *upcall, > > struct ofpbuf *buf) > > { > > - int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); > > - if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) { > > - struct ds flow; > > - char *packet; > > + int error = EAGAIN; > > > > - packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet), > > - ofpbuf_size(&upcall->packet)); > > - > > - ds_init(&flow); > > - odp_flow_key_format(upcall->key, upcall->key_len, &flow); > > - > > - VLOG_DBG("%s: %s upcall:\n%s\n%s", > > - dpif_name(dpif), > dpif_upcall_type_to_string(upcall->type), > > - ds_cstr(&flow), packet); > > - > > - ds_destroy(&flow); > > - free(packet); > > - } else if (error && error != EAGAIN) { > > - log_operation(dpif, "recv", error); > > + if (dpif->dpif_class->recv) { > > + error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf); > > + if (!error) { > > + dpif_print_packet(dpif, upcall); > > + } else if (error != EAGAIN) { > > + log_operation(dpif, "recv", error); > > + } > > } > > return error; > > } > > @@ -1401,7 +1445,9 @@ dpif_recv_purge(struct dpif *dpif) > > void > > dpif_recv_wait(struct dpif *dpif, uint32_t handler_id) > > { > > - dpif->dpif_class->recv_wait(dpif, handler_id); > > + if (dpif->dpif_class->recv_wait) { > > + dpif->dpif_class->recv_wait(dpif, handler_id); > > + } > > } > > > > /* Obtains the NetFlow engine type and engine ID for 'dpif' into > '*engine_type' > > diff --git a/lib/dpif.h b/lib/dpif.h > > index 94bcacc..8d8e43a 100644 > > --- a/lib/dpif.h > > +++ b/lib/dpif.h > > @@ -671,12 +671,20 @@ struct dpif_upcall { > > struct nlattr *userdata; /* Argument to > OVS_ACTION_ATTR_USERSPACE. */ > > }; > > > > +typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *, > > + struct ofpbuf *, int cnt); > > + > > int dpif_recv_set(struct dpif *, bool enable); > > int dpif_handlers_set(struct dpif *, uint32_t n_handlers); > > int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *, > > struct ofpbuf *); > > void dpif_recv_purge(struct dpif *); > > void dpif_recv_wait(struct dpif *, uint32_t handler_id); > > +void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *); > > +void dpif_enable_upcall(struct dpif *); > > +void dpif_disable_upcall(struct dpif *); > > + > > +void dpif_print_packet(struct dpif *, struct dpif_upcall *); > > > > /* Miscellaneous. */ > > > > diff --git a/ofproto/ofproto-dpif-upcall.c > b/ofproto/ofproto-dpif-upcall.c > > index df33643..51433e3 100644 > > --- a/ofproto/ofproto-dpif-upcall.c > > +++ b/ofproto/ofproto-dpif-upcall.c > > @@ -201,7 +201,9 @@ static struct list all_udpifs = > LIST_INITIALIZER(&all_udpifs); > > > > static size_t read_upcalls(struct handler *, > > struct upcall upcalls[UPCALL_MAX_BATCH]); > > -static void handle_upcalls(struct handler *, struct upcall *, size_t > n_upcalls); > > +static void free_upcall(struct upcall *); > > +static int convert_upcall(struct udpif *, struct upcall *); > > +static void handle_upcalls(struct udpif *, struct upcall *, size_t > n_upcalls); > > static void udpif_stop_threads(struct udpif *); > > static void udpif_start_threads(struct udpif *, size_t n_handlers, > > size_t n_revalidators); > > @@ -266,6 +268,8 @@ udpif_create(struct dpif_backer *backer, struct dpif > *dpif) > > atomic_init(&udpif->n_flows_timestamp, LLONG_MIN); > > ovs_mutex_init(&udpif->n_flows_mutex); > > > > + dpif_register_upcall_cb(dpif, exec_upcalls); > > + > > return udpif; > > } > > > > @@ -317,6 +321,8 @@ udpif_stop_threads(struct udpif *udpif) > > xpthread_join(udpif->revalidators[i].thread, NULL); > > } > > > > + dpif_disable_upcall(udpif->dpif); > > + > > for (i = 0; i < udpif->n_revalidators; i++) { > > struct revalidator *revalidator = &udpif->revalidators[i]; > > > > @@ -367,6 +373,8 @@ udpif_start_threads(struct udpif *udpif, size_t > n_handlers, > > "handler", udpif_upcall_handler, handler); > > } > > > > + dpif_enable_upcall(udpif->dpif); > > + > > ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators); > > udpif->reval_exit = false; > > udpif->revalidators = xzalloc(udpif->n_revalidators > > @@ -539,12 +547,10 @@ udpif_upcall_handler(void *arg) > > latch_wait(&udpif->exit_latch); > > poll_block(); > > } else { > > - handle_upcalls(handler, upcalls, n_upcalls); > > + handle_upcalls(handler->udpif, upcalls, n_upcalls); > > > > for (i = 0; i < n_upcalls; i++) { > > - xlate_out_uninit(&upcalls[i].xout); > > - ofpbuf_uninit(&upcalls[i].dpif_upcall.packet); > > - ofpbuf_uninit(&upcalls[i].upcall_buf); > > + free_upcall(&upcalls[i]); > > } > > } > > coverage_clear(); > > @@ -751,6 +757,63 @@ upcall_init(struct upcall *upcall, struct flow > *flow, struct ofpbuf *packet, > > xlate_actions(&xin, &upcall->xout); > > } > > > > +void > > +free_upcall(struct upcall *upcall) > > +{ > > + xlate_out_uninit(&upcall->xout); > > + ofpbuf_uninit(&upcall->dpif_upcall.packet); > > + ofpbuf_uninit(&upcall->upcall_buf); > > +} > > + > > +static struct udpif * > > +find_udpif(struct dpif *dpif) > > +{ > > + struct udpif *udpif; > > + > > + LIST_FOR_EACH (udpif, list_node, &all_udpifs) { > > + if (udpif->dpif == dpif) { > > + return udpif; > > + } > > + } > > + return NULL; > > +} > > + > > +void > > +exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls, > > + struct ofpbuf *bufs, int cnt) > > +{ > > + struct upcall upcalls[UPCALL_MAX_BATCH]; > > + size_t n_upcalls = 0; > > + struct udpif *udpif; > > + int i; > > + > > + ovs_assert(cnt <= UPCALL_MAX_BATCH); > > + > > + udpif = find_udpif(dpif); > > + ovs_assert(udpif); > > + > > + for (i = 0; i < cnt; i++) { > > + struct upcall *upcall = &upcalls[n_upcalls]; > > + struct dpif_upcall *dupcall = &dupcalls[i]; > > + struct ofpbuf *buf = &bufs[i]; > > + > > + upcall->dpif_upcall = *dupcall; > > + upcall->upcall_buf = *buf; > > + > > + dpif_print_packet(dpif, dupcall); > > + if (!convert_upcall(udpif, upcall)) { > > + n_upcalls += 1; > > + } > > + } > > + > > + if (n_upcalls) { > > + handle_upcalls(udpif, upcalls, n_upcalls); > > + for (i = 0; i < n_upcalls; i++) { > > + free_upcall(&upcalls[i]); > > + } > > + } > > +} > > + > > /* Reads and classifies upcalls. Returns the number of upcalls > successfully > > * read. */ > > static size_t > > @@ -764,14 +827,6 @@ read_upcalls(struct handler *handler, > > /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */ > > for (i = 0; i < UPCALL_MAX_BATCH; i++) { > > struct upcall *upcall = &upcalls[n_upcalls]; > > - struct dpif_upcall *dupcall; > > - struct ofpbuf *packet; > > - struct ofproto_dpif *ofproto; > > - struct dpif_sflow *sflow; > > - struct dpif_ipfix *ipfix; > > - struct flow flow; > > - enum upcall_type type; > > - odp_port_t odp_in_port; > > int error; > > > > ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, > > @@ -783,91 +838,107 @@ read_upcalls(struct handler *handler, > > break; > > } > > > > - dupcall = &upcall->dpif_upcall; > > - packet = &dupcall->packet; > > - error = xlate_receive(udpif->backer, packet, dupcall->key, > > - dupcall->key_len, &flow, > > - &ofproto, &ipfix, &sflow, NULL, > &odp_in_port); > > - if (error) { > > - if (error == ENODEV) { > > - /* Received packet on datapath port for which we > couldn't > > - * associate an ofproto. This can happen if a port is > removed > > - * while traffic is being received. Print a > rate-limited > > - * message in case it happens frequently. Install a > drop flow > > - * so that future packets of the flow are inexpensively > dropped > > - * in the kernel. */ > > - VLOG_INFO_RL(&rl, "received packet on unassociated > datapath " > > - "port %"PRIu32, odp_in_port); > > - dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | > DPIF_FP_MODIFY, > > - dupcall->key, dupcall->key_len, NULL, 0, > NULL, 0, > > - NULL); > > - } > > - goto destroy_upcall; > > + if (!convert_upcall(udpif, upcall)) { > > + n_upcalls += 1; > > } > > + } > > + return n_upcalls; > > +} > > > > - type = classify_upcall(upcall); > > - if (type == MISS_UPCALL) { > > - upcall_init(upcall, &flow, packet, ofproto, dupcall, > odp_in_port); > > - n_upcalls++; > > - continue; > > - } > > +int > > +convert_upcall(struct udpif *udpif, struct upcall *upcall) > > +{ > > + struct dpif_upcall *dupcall = &upcall->dpif_upcall; > > + struct ofpbuf *packet = &dupcall->packet; > > + struct ofproto_dpif *ofproto; > > + struct dpif_sflow *sflow; > > + struct dpif_ipfix *ipfix; > > + struct flow flow; > > + enum upcall_type type; > > + odp_port_t odp_in_port; > > + int error; > > > > - switch (type) { > > - case SFLOW_UPCALL: > > - if (sflow) { > > - union user_action_cookie cookie; > > + error = xlate_receive(udpif->backer, packet, dupcall->key, > > + dupcall->key_len, &flow, > > + &ofproto, &ipfix, &sflow, NULL, &odp_in_port); > > > > - memset(&cookie, 0, sizeof cookie); > > - memcpy(&cookie, nl_attr_get(dupcall->userdata), > > - sizeof cookie.sflow); > > - dpif_sflow_received(sflow, packet, &flow, odp_in_port, > > - &cookie); > > - } > > - break; > > - case IPFIX_UPCALL: > > - if (ipfix) { > > - dpif_ipfix_bridge_sample(ipfix, packet, &flow); > > - } > > - break; > > - case FLOW_SAMPLE_UPCALL: > > - if (ipfix) { > > - union user_action_cookie cookie; > > - > > - memset(&cookie, 0, sizeof cookie); > > - memcpy(&cookie, nl_attr_get(dupcall->userdata), > > - sizeof cookie.flow_sample); > > - > > - /* The flow reflects exactly the contents of the packet. > > - * Sample the packet using it. */ > > - dpif_ipfix_flow_sample(ipfix, packet, &flow, > > - > cookie.flow_sample.collector_set_id, > > - cookie.flow_sample.probability, > > - cookie.flow_sample.obs_domain_id, > > - cookie.flow_sample.obs_point_id); > > - } > > - break; > > - case BAD_UPCALL: > > - break; > > - case MISS_UPCALL: > > - OVS_NOT_REACHED(); > > + if (error) { > > + if (error == ENODEV) { > > + /* Received packet on datapath port for which we couldn't > > + * associate an ofproto. This can happen if a port is > removed > > + * while traffic is being received. Print a rate-limited > > + * message in case it happens frequently. Install a drop > flow > > + * so that future packets of the flow are inexpensively > dropped > > + * in the kernel. */ > > + VLOG_INFO_RL(&rl, "received packet on unassociated datapath > " > > + "port %"PRIu32, odp_in_port); > > + dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY, > > + dupcall->key, dupcall->key_len, NULL, 0, > NULL, 0, > > + NULL); > > } > > + goto destroy_upcall; > > + } > > > > - dpif_ipfix_unref(ipfix); > > - dpif_sflow_unref(sflow); > > + type = classify_upcall(upcall); > > + if (type == MISS_UPCALL) { > > + upcall_init(upcall, &flow, packet, ofproto, dupcall, > odp_in_port); > > + return error; > > + } > > > > -destroy_upcall: > > - ofpbuf_uninit(&upcall->dpif_upcall.packet); > > - ofpbuf_uninit(&upcall->upcall_buf); > > + switch (type) { > > + case SFLOW_UPCALL: > > + if (sflow) { > > + union user_action_cookie cookie; > > + > > + memset(&cookie, 0, sizeof cookie); > > + memcpy(&cookie, nl_attr_get(dupcall->userdata), > > + sizeof cookie.sflow); > > + dpif_sflow_received(sflow, packet, &flow, odp_in_port, > > + &cookie); > > + } > > + break; > > + case IPFIX_UPCALL: > > + if (ipfix) { > > + dpif_ipfix_bridge_sample(ipfix, packet, &flow); > > + } > > + break; > > + case FLOW_SAMPLE_UPCALL: > > + if (ipfix) { > > + union user_action_cookie cookie; > > + > > + memset(&cookie, 0, sizeof cookie); > > + memcpy(&cookie, nl_attr_get(dupcall->userdata), > > + sizeof cookie.flow_sample); > > + > > + /* The flow reflects exactly the contents of the packet. > > + * Sample the packet using it. */ > > + dpif_ipfix_flow_sample(ipfix, packet, &flow, > > + cookie.flow_sample.collector_set_id, > > + cookie.flow_sample.probability, > > + cookie.flow_sample.obs_domain_id, > > + cookie.flow_sample.obs_point_id); > > + } > > + break; > > + case BAD_UPCALL: > > + break; > > + case MISS_UPCALL: > > + OVS_NOT_REACHED(); > > } > > > > - return n_upcalls; > > + dpif_ipfix_unref(ipfix); > > + dpif_sflow_unref(sflow); > > + error = EAGAIN; > > + > > +destroy_upcall: > > + ofpbuf_uninit(&upcall->dpif_upcall.packet); > > + ofpbuf_uninit(&upcall->upcall_buf); > > + return error; > > } > > > > static void > > -handle_upcalls(struct handler *handler, struct upcall *upcalls, > > +handle_upcalls(struct udpif *udpif, struct upcall *upcalls, > > size_t n_upcalls) > > { > > - struct udpif *udpif = handler->udpif; > > struct dpif_op *opsp[UPCALL_MAX_BATCH * 2]; > > struct dpif_op ops[UPCALL_MAX_BATCH * 2]; > > size_t n_ops, i; > > diff --git a/ofproto/ofproto-dpif-upcall.h > b/ofproto/ofproto-dpif-upcall.h > > index 8c4b655..2b197ad 100644 > > --- a/ofproto/ofproto-dpif-upcall.h > > +++ b/ofproto/ofproto-dpif-upcall.h > > @@ -19,6 +19,8 @@ > > > > struct dpif; > > struct dpif_backer; > > +struct dpif_upcall; > > +struct ofpbuf; > > struct seq; > > struct simap; > > > > @@ -26,6 +28,9 @@ struct simap; > > * them. Additionally, it's responsible for maintaining the datapath > flow > > * table. */ > > > > +void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *, > > + int cnt); > > + > > struct udpif *udpif_create(struct dpif_backer *, struct dpif *); > > void udpif_run(struct udpif *udpif); > > void udpif_set_threads(struct udpif *, size_t n_handlers, > > -- > > 1.7.9.5 > > > > _______________________________________________ > > dev mailing list > > dev@openvswitch.org > > http://openvswitch.org/mailman/listinfo/dev > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev