On Mon, Jul 21, 2014 at 2:11 PM, Ethan Jackson <et...@nicira.com> wrote:

> I think we could take this further by getting rid of the flow put and
> execute, and having exec_upcalls instead just return the necessary
> actions and mask.  At any rate, this is a good staring point.
>
> In exec_upcalls, we assert that cnt is less that UPCALL_MAX_BATCH
> which seems wrong.  It's much less than the netdev RX batch size, so
> I'm pretty sure we're going to violate this assertion on occasion.
> Could we just fill up a batch, execute it, and then fill up another if
> it's too big?  We should probably make UPCALL_MAX_BATCH 64 in that
> case
>
>
Fixed this.


> I don't see why we need dp_netdev_disable_upcall or
> dp_netdev_purge_userspace_queue, they could just be merged into the
> functions that call them.
>
> dp_netdev_disable_upcall is added, so create_dp_netdev() does not need
OVS_NO_THREAD_SAFETY_ANALYSIS. This is because it creates and locks
dp->upcall_mutex, but create_dp_netdev() cannot have a OVS_ACQUIRES
annotation since it has not been created yet.

I'll fix dp_netdev_purge_userspace_queue() though.


> The fact that exec_upcalls calls dpif_print_packet() is a bit weird,
> feels like a layering violation.  I'm conflicted though, what do you
> think?
>
>
Not sure why this would be since we call dpif_recv in ofproto-upcall.



> Ethan
>
> On Sun, Jul 20, 2014 at 3:37 PM, Ryan Wilson <wr...@nicira.com> wrote:
> > Typically, kernel datapath threads send upcalls to userspace where
> > handler threads process the upcalls. For TAP and DPDK devices, the
> > datapath threads operate in userspace, so there is no need for
> > separate handler threads.
> >
> > This patch allows userspace datapath threads to directly call the
> > ofproto upcall functions, eliminating the need for handler threads
> > for datapaths of type 'netdev'.
> >
> > Signed-off-by: Ryan Wilson <wr...@nicira.com>
> > ---
> > v2: Fix race condition found during perf test
> > v3: Addressed Daniele's comments
> > v4: Addressed Ben's style comments, added packet batching
> > v5: Rebase
> > v6: Another rebase
> > v7: Rebase
> > ---
> >  lib/dpif-linux.c              |    3 +
> >  lib/dpif-netdev.c             |  395
> +++++++++++++----------------------------
> >  lib/dpif-netdev.h             |    1 +
> >  lib/dpif-provider.h           |   16 ++
> >  lib/dpif.c                    |   92 +++++++---
> >  lib/dpif.h                    |    8 +
> >  ofproto/ofproto-dpif-upcall.c |  239 ++++++++++++++++---------
> >  ofproto/ofproto-dpif-upcall.h |    5 +
> >  8 files changed, 378 insertions(+), 381 deletions(-)
> >
> > diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
> > index 4475731..b98413d 100644
> > --- a/lib/dpif-linux.c
> > +++ b/lib/dpif-linux.c
> > @@ -1934,6 +1934,9 @@ const struct dpif_class dpif_linux_class = {
> >      dpif_linux_recv,
> >      dpif_linux_recv_wait,
> >      dpif_linux_recv_purge,
> > +    NULL,                       /* register_upcall_cb */
> > +    NULL,                       /* enable_upcall */
> > +    NULL,                       /* disable_upcall */
> >  };
> >
> >  static int
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index 09220b6..7f53111 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -78,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth,
> 0)
> >  /* Configuration parameters. */
> >  enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow
> table. */
> >
> > -/* Queues. */
> > -enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue.
> */
> > -enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
> > -BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
> > -
> >  /* Protects against changes to 'dp_netdevs'. */
> >  static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
> >
> > @@ -90,27 +85,18 @@ static struct ovs_mutex dp_netdev_mutex =
> OVS_MUTEX_INITIALIZER;
> >  static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
> >      = SHASH_INITIALIZER(&dp_netdevs);
> >
> > -struct dp_netdev_upcall {
> > -    struct dpif_upcall upcall;  /* Queued upcall information. */
> > -    struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
> > -};
> > +enum { MAX_QUEUE_LEN = 50 };   /* Maximum number of packets per
> userspace
> > +                                * queue. */
> >
> > -/* A queue passing packets from a struct dp_netdev to its clients
> (handlers).
> > - *
> > - *
> > - * Thread-safety
> > - * =============
> > - *
> > - * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
> > - * its own mutex. */
> >  struct dp_netdev_queue {
> > -    struct ovs_mutex mutex;
> > -    struct seq *seq;      /* Incremented whenever a packet is queued. */
> > -    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
> > -    unsigned int head OVS_GUARDED;
> > -    unsigned int tail OVS_GUARDED;
> > +    unsigned int packet_count;
> > +
> > +    struct dpif_upcall upcalls[MAX_QUEUE_LEN];
> > +    struct ofpbuf bufs[MAX_QUEUE_LEN];
> >  };
> >
> > +#define DP_NETDEV_QUEUE_INITIALIZER { .packet_count = 0 }
> > +
> >  /* Datapath based on the network device interface from netdev.h.
> >   *
> >   *
> > @@ -125,11 +111,11 @@ struct dp_netdev_queue {
> >   *    dp_netdev_mutex (global)
> >   *    port_mutex
> >   *    flow_mutex
> > - *    queue_rwlock
> >   */
> >  struct dp_netdev {
> >      const struct dpif_class *const class;
> >      const char *const name;
> > +    struct dpif *dpif;
> >      struct ovs_refcount ref_cnt;
> >      atomic_flag destroyed;
> >
> > @@ -142,15 +128,6 @@ struct dp_netdev {
> >      struct classifier cls;
> >      struct cmap flow_table OVS_GUARDED; /* Flow table. */
> >
> > -    /* Queues.
> > -     *
> > -     * 'queue_rwlock' protects the modification of 'handler_queues' and
> > -     * 'n_handlers'.  The queue elements are protected by its
> > -     * 'handler_queues''s mutex. */
> > -    struct fat_rwlock queue_rwlock;
> > -    struct dp_netdev_queue *handler_queues;
> > -    uint32_t n_handlers;
> > -
> >      /* Statistics.
> >       *
> >       * ovsthread_stats is internally synchronized. */
> > @@ -163,6 +140,11 @@ struct dp_netdev {
> >      struct cmap ports;
> >      struct seq *port_seq;       /* Incremented whenever a port changes.
> */
> >
> > +    /* Protects access to ofproto-dpif-upcall interface during
> revalidator
> > +     * thread synchronization. */
> > +    struct fat_rwlock upcall_rwlock;
> > +    exec_upcall_cb *upcall_cb;  /* Callback function for executing
> upcalls. */
> > +
> >      /* Forwarding threads. */
> >      struct latch exit_latch;
> >      struct pmd_thread *pmd_threads;
> > @@ -339,14 +321,14 @@ static int do_add_port(struct dp_netdev *dp, const
> char *devname,
> >      OVS_REQUIRES(dp->port_mutex);
> >  static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
> >      OVS_REQUIRES(dp->port_mutex);
> > -static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
> > -    OVS_REQ_WRLOCK(dp->queue_rwlock);
> >  static int dpif_netdev_open(const struct dpif_class *, const char *name,
> >                              bool create, struct dpif **);
> > -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct
> ofpbuf *,
> > -                                      int queue_no, int type,
> > -                                      const struct miniflow *,
> > -                                      const struct nlattr *userdata);
> > +static int dp_netdev_queue_userspace_packet(struct dp_netdev_queue *,
> > +                                            struct ofpbuf *, int type,
> > +                                            const struct miniflow *,
> > +                                            const struct nlattr *);
> > +static void dp_netdev_execute_userspace_queue(struct dp_netdev_queue *,
> > +                                              struct dp_netdev *);
> >  static void dp_netdev_execute_actions(struct dp_netdev *dp,
> >                                        struct dpif_packet **, int c,
> >                                        bool may_steal, struct
> pkt_metadata *,
> > @@ -357,6 +339,7 @@ static void dp_netdev_port_input(struct dp_netdev
> *dp,
> >                                   odp_port_t port_no);
> >
> >  static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
> > +static void dp_netdev_disable_upcall(struct dp_netdev *);
> >
> >  static struct dpif_netdev *
> >  dpif_netdev_cast(const struct dpif *dpif)
> > @@ -484,14 +467,17 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >      classifier_init(&dp->cls, NULL);
> >      cmap_init(&dp->flow_table);
> >
> > -    fat_rwlock_init(&dp->queue_rwlock);
> > -
> >      ovsthread_stats_init(&dp->stats);
> >
> >      ovs_mutex_init(&dp->port_mutex);
> >      cmap_init(&dp->ports);
> >      dp->port_seq = seq_create();
> >      latch_init(&dp->exit_latch);
> > +    fat_rwlock_init(&dp->upcall_rwlock);
> > +
> > +    /* Disable upcalls by default. */
> > +    dp_netdev_disable_upcall(dp);
> > +    dp->upcall_cb = NULL;
> >
> >      ovs_mutex_lock(&dp->port_mutex);
> >      error = do_add_port(dp, name, "internal", ODPP_LOCAL);
> > @@ -523,31 +509,13 @@ dpif_netdev_open(const struct dpif_class *class,
> const char *name,
> >      }
> >      if (!error) {
> >          *dpifp = create_dpif_netdev(dp);
> > +        dp->dpif = *dpifp;
> >      }
> >      ovs_mutex_unlock(&dp_netdev_mutex);
> >
> >      return error;
> >  }
> >
> > -static void
> > -dp_netdev_purge_queues(struct dp_netdev *dp)
> > -    OVS_REQ_WRLOCK(dp->queue_rwlock)
> > -{
> > -    int i;
> > -
> > -    for (i = 0; i < dp->n_handlers; i++) {
> > -        struct dp_netdev_queue *q = &dp->handler_queues[i];
> > -
> > -        ovs_mutex_lock(&q->mutex);
> > -        while (q->tail != q->head) {
> > -            struct dp_netdev_upcall *u = &q->upcalls[q->tail++ &
> QUEUE_MASK];
> > -            ofpbuf_uninit(&u->upcall.packet);
> > -            ofpbuf_uninit(&u->buf);
> > -        }
> > -        ovs_mutex_unlock(&q->mutex);
> > -    }
> > -}
> > -
> >  /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
> >   * through the 'dp_netdevs' shash while freeing 'dp'. */
> >  static void
> > @@ -576,17 +544,12 @@ dp_netdev_free(struct dp_netdev *dp)
> >      }
> >      ovsthread_stats_destroy(&dp->stats);
> >
> > -    fat_rwlock_wrlock(&dp->queue_rwlock);
> > -    dp_netdev_destroy_all_queues(dp);
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > -
> > -    fat_rwlock_destroy(&dp->queue_rwlock);
> > -
> >      classifier_destroy(&dp->cls);
> >      cmap_destroy(&dp->flow_table);
> >      ovs_mutex_destroy(&dp->flow_mutex);
> >      seq_destroy(dp->port_seq);
> >      cmap_destroy(&dp->ports);
> > +    fat_rwlock_destroy(&dp->upcall_rwlock);
> >      latch_destroy(&dp->exit_latch);
> >      free(CONST_CAST(char *, dp->name));
> >      free(dp);
> > @@ -1559,80 +1522,6 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
> >      return 0;
> >  }
> >
> > -static void
> > -dp_netdev_destroy_all_queues(struct dp_netdev *dp)
> > -    OVS_REQ_WRLOCK(dp->queue_rwlock)
> > -{
> > -    size_t i;
> > -
> > -    dp_netdev_purge_queues(dp);
> > -
> > -    for (i = 0; i < dp->n_handlers; i++) {
> > -        struct dp_netdev_queue *q = &dp->handler_queues[i];
> > -
> > -        ovs_mutex_destroy(&q->mutex);
> > -        seq_destroy(q->seq);
> > -    }
> > -    free(dp->handler_queues);
> > -    dp->handler_queues = NULL;
> > -    dp->n_handlers = 0;
> > -}
> > -
> > -static void
> > -dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
> > -    OVS_REQ_WRLOCK(dp->queue_rwlock)
> > -{
> > -    if (dp->n_handlers != n_handlers) {
> > -        size_t i;
> > -
> > -        dp_netdev_destroy_all_queues(dp);
> > -
> > -        dp->n_handlers = n_handlers;
> > -        dp->handler_queues = xzalloc(n_handlers * sizeof
> *dp->handler_queues);
> > -
> > -        for (i = 0; i < n_handlers; i++) {
> > -            struct dp_netdev_queue *q = &dp->handler_queues[i];
> > -
> > -            ovs_mutex_init(&q->mutex);
> > -            q->seq = seq_create();
> > -        }
> > -    }
> > -}
> > -
> > -static int
> > -dpif_netdev_recv_set(struct dpif *dpif, bool enable)
> > -{
> > -    struct dp_netdev *dp = get_dp_netdev(dpif);
> > -
> > -    if ((dp->handler_queues != NULL) == enable) {
> > -        return 0;
> > -    }
> > -
> > -    fat_rwlock_wrlock(&dp->queue_rwlock);
> > -    if (!enable) {
> > -        dp_netdev_destroy_all_queues(dp);
> > -    } else {
> > -        dp_netdev_refresh_queues(dp, 1);
> > -    }
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > -
> > -    return 0;
> > -}
> > -
> > -static int
> > -dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
> > -{
> > -    struct dp_netdev *dp = get_dp_netdev(dpif);
> > -
> > -    fat_rwlock_wrlock(&dp->queue_rwlock);
> > -    if (dp->handler_queues) {
> > -        dp_netdev_refresh_queues(dp, n_handlers);
> > -    }
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > -
> > -    return 0;
> > -}
> > -
> >  static int
> >  dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
> >                                uint32_t queue_id, uint32_t *priority)
> > @@ -1641,97 +1530,6 @@ dpif_netdev_queue_to_priority(const struct dpif
> *dpif OVS_UNUSED,
> >      return 0;
> >  }
> >
> > -static bool
> > -dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t
> handler_id)
> > -    OVS_REQ_RDLOCK(dp->queue_rwlock)
> > -{
> > -    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> > -
> > -    if (!dp->handler_queues) {
> > -        VLOG_WARN_RL(&rl, "receiving upcall disabled");
> > -        return false;
> > -    }
> > -
> > -    if (handler_id >= dp->n_handlers) {
> > -        VLOG_WARN_RL(&rl, "handler index out of bound");
> > -        return false;
> > -    }
> > -
> > -    return true;
> > -}
> > -
> > -static int
> > -dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
> > -                 struct dpif_upcall *upcall, struct ofpbuf *buf)
> > -{
> > -    struct dp_netdev *dp = get_dp_netdev(dpif);
> > -    struct dp_netdev_queue *q;
> > -    int error = 0;
> > -
> > -    fat_rwlock_rdlock(&dp->queue_rwlock);
> > -
> > -    if (!dp_netdev_recv_check(dp, handler_id)) {
> > -        error = EAGAIN;
> > -        goto out;
> > -    }
> > -
> > -    q = &dp->handler_queues[handler_id];
> > -    ovs_mutex_lock(&q->mutex);
> > -    if (q->head != q->tail) {
> > -        struct dp_netdev_upcall *u = &q->upcalls[q->tail++ &
> QUEUE_MASK];
> > -
> > -        *upcall = u->upcall;
> > -
> > -        ofpbuf_uninit(buf);
> > -        *buf = u->buf;
> > -    } else {
> > -        error = EAGAIN;
> > -    }
> > -    ovs_mutex_unlock(&q->mutex);
> > -
> > -out:
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > -
> > -    return error;
> > -}
> > -
> > -static void
> > -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
> > -{
> > -    struct dp_netdev *dp = get_dp_netdev(dpif);
> > -    struct dp_netdev_queue *q;
> > -    uint64_t seq;
> > -
> > -    fat_rwlock_rdlock(&dp->queue_rwlock);
> > -
> > -    if (!dp_netdev_recv_check(dp, handler_id)) {
> > -        goto out;
> > -    }
> > -
> > -    q = &dp->handler_queues[handler_id];
> > -    ovs_mutex_lock(&q->mutex);
> > -    seq = seq_read(q->seq);
> > -    if (q->head != q->tail) {
> > -        poll_immediate_wake();
> > -    } else {
> > -        seq_wait(q->seq, seq);
> > -    }
> > -
> > -    ovs_mutex_unlock(&q->mutex);
> > -
> > -out:
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > -}
> > -
> > -static void
> > -dpif_netdev_recv_purge(struct dpif *dpif)
> > -{
> > -    struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
> > -
> > -    fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
> > -    dp_netdev_purge_queues(dpif_netdev->dp);
> > -    fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
> > -}
> >
> >  /* Creates and returns a new 'struct dp_netdev_actions', with a
> reference count
> >   * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
> > @@ -1919,6 +1717,36 @@ reload:
> >  }
> >
> >  static void
> > +dp_netdev_disable_upcall(struct dp_netdev *dp)
> > +    OVS_ACQUIRES(dp->upcall_rwlock)
> > +{
> > +    fat_rwlock_wrlock(&dp->upcall_rwlock);
> > +}
> > +
> > +static void
> > +dpif_netdev_disable_upcall(struct dpif *dpif)
> > +    OVS_NO_THREAD_SAFETY_ANALYSIS
> > +{
> > +    struct dp_netdev *dp = get_dp_netdev(dpif);
> > +    dp_netdev_disable_upcall(dp);
> > +}
> > +
> > +static void
> > +dp_netdev_enable_upcall(struct dp_netdev *dp)
> > +    OVS_RELEASES(dp->upcall_rwlock)
> > +{
> > +    fat_rwlock_unlock(&dp->upcall_rwlock);
> > +}
> > +
> > +static void
> > +dpif_netdev_enable_upcall(struct dpif *dpif)
> > +    OVS_NO_THREAD_SAFETY_ANALYSIS
> > +{
> > +    struct dp_netdev *dp = get_dp_netdev(dpif);
> > +    dp_netdev_enable_upcall(dp);
> > +}
> > +
> > +static void
> >  dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
> >  {
> >      int i;
> > @@ -2056,6 +1884,7 @@ static void
> >  dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int
> cnt,
> >                  struct pkt_metadata *md)
> >  {
> > +    struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
> >      struct packet_batch batches[NETDEV_MAX_RX_BATCH];
> >      struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
> >      const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad
> packets. */
> > @@ -2087,17 +1916,11 @@ dp_netdev_input(struct dp_netdev *dp, struct
> dpif_packet **packets, int cnt,
> >          }
> >
> >          if (OVS_UNLIKELY(!rules[i])) {
> > +            struct ofpbuf *buf = &packets[i]->ofpbuf;
> >
> >              dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
> > -
> > -            if (OVS_LIKELY(dp->handler_queues)) {
> > -                uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
> > -                struct ofpbuf *buf = &packets[i]->ofpbuf;
> > -
> > -                dp_netdev_output_userspace(dp, buf, hash %
> dp->n_handlers,
> > -                                           DPIF_UC_MISS, mfs[i], NULL);
> > -            }
> > -
> > +            dp_netdev_queue_userspace_packet(&q, buf, DPIF_UC_MISS,
> > +                                             mfs[i], NULL);
> >              dpif_packet_delete(packets[i]);
> >              continue;
> >          }
> > @@ -2127,6 +1950,10 @@ dp_netdev_input(struct dp_netdev *dp, struct
> dpif_packet **packets, int cnt,
> >      for (i = 0; i < n_batches; i++) {
> >          packet_batch_execute(&batches[i], dp);
> >      }
> > +
> > +    if (q.packet_count) {
> > +        dp_netdev_execute_userspace_queue(&q, dp);
> > +    }
> >  }
> >
> >  static void
> > @@ -2145,12 +1972,11 @@ dp_netdev_queue_userspace_packet(struct
> dp_netdev_queue *q,
> >                                   struct ofpbuf *packet, int type,
> >                                   const struct miniflow *key,
> >                                   const struct nlattr *userdata)
> > -OVS_REQUIRES(q->mutex)
> >  {
> > -    if (q->head - q->tail < MAX_QUEUE_LEN) {
> > -        struct dp_netdev_upcall *u = &q->upcalls[q->head++ &
> QUEUE_MASK];
> > -        struct dpif_upcall *upcall = &u->upcall;
> > -        struct ofpbuf *buf = &u->buf;
> > +    if (q->packet_count < MAX_QUEUE_LEN) {
> > +        int cnt = q->packet_count;
> > +        struct dpif_upcall *upcall = &q->upcalls[cnt];
> > +        struct ofpbuf *buf = &q->bufs[cnt];
> >          size_t buf_size;
> >          struct flow flow;
> >
> > @@ -2173,7 +1999,7 @@ OVS_REQUIRES(q->mutex)
> >          /* Put userdata. */
> >          if (userdata) {
> >              upcall->userdata = ofpbuf_put(buf, userdata,
> > -                    NLA_ALIGN(userdata->nla_len));
> > +                                          NLA_ALIGN(userdata->nla_len));
> >          }
> >
> >          /* We have to perform a copy of the packet, because we cannot
> send DPDK
> > @@ -2183,35 +2009,43 @@ OVS_REQUIRES(q->mutex)
> >                          ofpbuf_size(packet)));
> >          ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
> >
> > -        seq_change(q->seq);
> > -
> > +        q->packet_count++;
> >          return 0;
> >      } else {
> >          return ENOBUFS;
> >      }
> >  }
> >
> > -static int
> > -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
> > -                           int queue_no, int type,
> > -                           const struct miniflow *key,
> > -                           const struct nlattr *userdata)
> > +static void
> > +dp_netdev_purge_userspace_queue(struct dp_netdev_queue *q)
> >  {
> > -    struct dp_netdev_queue *q;
> > -    int error;
> > +    struct dpif_upcall *upcalls = q->upcalls;
> > +    struct ofpbuf *bufs = q->bufs;
> > +    int cnt = q->packet_count;
> > +    int i;
> >
> > -    fat_rwlock_rdlock(&dp->queue_rwlock);
> > -    q = &dp->handler_queues[queue_no];
> > -    ovs_mutex_lock(&q->mutex);
> > -    error = dp_netdev_queue_userspace_packet(q, packet, type, key,
> > -                                             userdata);
> > -    if (error == ENOBUFS) {
> > -        dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
> > +    for (i = 0; i < cnt; i++) {
> > +        ofpbuf_uninit(&bufs[i]);
> > +        ofpbuf_uninit(&upcalls[i].packet);
> >      }
> > -    ovs_mutex_unlock(&q->mutex);
> > -    fat_rwlock_unlock(&dp->queue_rwlock);
> > +}
> >
> > -    return error;
> > +static void
> > +dp_netdev_execute_userspace_queue(struct dp_netdev_queue *q,
> > +                                  struct dp_netdev *dp)
> > +{
> > +    struct dpif_upcall *upcalls = q->upcalls;
> > +    struct ofpbuf *bufs = q->bufs;
> > +    int cnt = q->packet_count;
> > +
> > +    if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
> > +        ovs_assert(dp->upcall_cb);
> > +        dp->upcall_cb(dp->dpif, upcalls, bufs, cnt);
> > +        fat_rwlock_unlock(&dp->upcall_rwlock);
> > +    } else {
> > +        dp_netdev_purge_userspace_queue(q);
> > +    }
> > +    q->packet_count = 0;
> >  }
> >
> >  struct dp_netdev_execute_aux {
> > @@ -2219,6 +2053,13 @@ struct dp_netdev_execute_aux {
> >  };
> >
> >  static void
> > +dpif_netdev_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
> > +{
> > +    struct dp_netdev *dp = get_dp_netdev(dpif);
> > +    dp->upcall_cb = cb;
> > +}
> > +
> > +static void
> >  dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
> >                struct pkt_metadata *md,
> >                const struct nlattr *a, bool may_steal)
> > @@ -2245,6 +2086,7 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >      case OVS_ACTION_ATTR_USERSPACE: {
> >          const struct nlattr *userdata;
> >          struct netdev_flow_key key;
> > +        struct dp_netdev_queue q = DP_NETDEV_QUEUE_INITIALIZER;
> >
> >          userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
> >
> > @@ -2257,15 +2099,17 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >
> >              miniflow_extract(packet, md, &key.flow);
> >
> > -            dp_netdev_output_userspace(aux->dp, packet,
> > -                                       miniflow_hash_5tuple(&key.flow,
> 0)
> > -                                           % aux->dp->n_handlers,
> > -                                       DPIF_UC_ACTION, &key.flow,
> > -                                       userdata);
> > +            dp_netdev_queue_userspace_packet(&q, packet,
> > +                                             DPIF_UC_ACTION, &key.flow,
> > +                                             userdata);
> >              if (may_steal) {
> >                  dpif_packet_delete(packets[i]);
> >              }
> >          }
> > +
> > +        if (q.packet_count) {
> > +            dp_netdev_execute_userspace_queue(&q, aux->dp);
> > +        }
> >          break;
> >      }
> >
> > @@ -2391,12 +2235,15 @@ const struct dpif_class dpif_netdev_class = {
> >      dpif_netdev_flow_dump_next,
> >      dpif_netdev_execute,
> >      NULL,                       /* operate */
> > -    dpif_netdev_recv_set,
> > -    dpif_netdev_handlers_set,
> > +    NULL,                       /* recv_set */
> > +    NULL,                       /* handlers_set */
> >      dpif_netdev_queue_to_priority,
> > -    dpif_netdev_recv,
> > -    dpif_netdev_recv_wait,
> > -    dpif_netdev_recv_purge,
> > +    NULL,                       /* recv */
> > +    NULL,                       /* recv_wait */
> > +    NULL,                       /* recv_purge */
> > +    dpif_netdev_register_upcall_cb,
> > +    dpif_netdev_enable_upcall,
> > +    dpif_netdev_disable_upcall,
> >  };
> >
> >  static void
> > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
> > index 0f42d7a..410fcfa 100644
> > --- a/lib/dpif-netdev.h
> > +++ b/lib/dpif-netdev.h
> > @@ -20,6 +20,7 @@
> >  #include <stdbool.h>
> >  #include <stddef.h>
> >  #include <stdint.h>
> > +#include "dpif.h"
> >  #include "openvswitch/types.h"
> >  #include "ofpbuf.h"
> >  #include "packets.h"
> > diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> > index 6a06cf8..bf24a9d 100644
> > --- a/lib/dpif-provider.h
> > +++ b/lib/dpif-provider.h
> > @@ -411,6 +411,22 @@ struct dpif_class {
> >      /* Throws away any queued upcalls that 'dpif' currently has ready to
> >       * return. */
> >      void (*recv_purge)(struct dpif *dpif);
> > +
> > +    /* For datapaths that run in userspace (i.e. dpif-netdev), threads
> polling
> > +     * for incoming packets can directly call upcall functions instead
> of
> > +     * offloading packet processing to separate handler threads.
> Datapaths
> > +     * that directly call upcall functions should use the functions
> below to
> > +     * to register an upcall function and enable / disable upcalls.
> > +     *
> > +     * Registers an upcall callback function with 'dpif'. This is only
> used if
> > +     * if 'dpif' directly executes upcall functions. */
> > +    void (*register_upcall_cb)(struct dpif *, exec_upcall_cb *);
> > +
> > +    /* Enables upcalls if 'dpif' directly executes upcall functions. */
> > +    void (*enable_upcall)(struct dpif *);
> > +
> > +    /* Disables upcalls if 'dpif' directly executes upcall functions. */
> > +    void (*disable_upcall)(struct dpif *);
> >  };
> >
> >  extern const struct dpif_class dpif_linux_class;
> > diff --git a/lib/dpif.c b/lib/dpif.c
> > index a325805..3501569 100644
> > --- a/lib/dpif.c
> > +++ b/lib/dpif.c
> > @@ -1305,8 +1305,12 @@ dpif_upcall_type_to_string(enum dpif_upcall_type
> type)
> >  int
> >  dpif_recv_set(struct dpif *dpif, bool enable)
> >  {
> > -    int error = dpif->dpif_class->recv_set(dpif, enable);
> > -    log_operation(dpif, "recv_set", error);
> > +    int error = 0;
> > +
> > +    if (dpif->dpif_class->recv_set) {
> > +        error = dpif->dpif_class->recv_set(dpif, enable);
> > +        log_operation(dpif, "recv_set", error);
> > +    }
> >      return error;
> >  }
> >
> > @@ -1333,11 +1337,61 @@ dpif_recv_set(struct dpif *dpif, bool enable)
> >  int
> >  dpif_handlers_set(struct dpif *dpif, uint32_t n_handlers)
> >  {
> > -    int error = dpif->dpif_class->handlers_set(dpif, n_handlers);
> > -    log_operation(dpif, "handlers_set", error);
> > +    int error = 0;
> > +
> > +    if (dpif->dpif_class->handlers_set) {
> > +        error = dpif->dpif_class->handlers_set(dpif, n_handlers);
> > +        log_operation(dpif, "handlers_set", error);
> > +    }
> >      return error;
> >  }
> >
> > +void
> > +dpif_register_upcall_cb(struct dpif *dpif, exec_upcall_cb *cb)
> > +{
> > +    if (dpif->dpif_class->register_upcall_cb) {
> > +        dpif->dpif_class->register_upcall_cb(dpif, cb);
> > +    }
> > +}
> > +
> > +void
> > +dpif_enable_upcall(struct dpif *dpif)
> > +{
> > +    if (dpif->dpif_class->enable_upcall) {
> > +        dpif->dpif_class->enable_upcall(dpif);
> > +    }
> > +}
> > +
> > +void
> > +dpif_disable_upcall(struct dpif *dpif)
> > +{
> > +    if (dpif->dpif_class->disable_upcall) {
> > +        dpif->dpif_class->disable_upcall(dpif);
> > +    }
> > +}
> > +
> > +void
> > +dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
> > +{
> > +    if (!VLOG_DROP_DBG(&dpmsg_rl)) {
> > +        struct ds flow;
> > +        char *packet;
> > +
> > +        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
> > +                                      ofpbuf_size(&upcall->packet));
> > +
> > +        ds_init(&flow);
> > +        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
> > +
> > +        VLOG_DBG("%s: %s upcall:\n%s\n%s",
> > +                 dpif_name(dpif),
> dpif_upcall_type_to_string(upcall->type),
> > +                 ds_cstr(&flow), packet);
> > +
> > +        ds_destroy(&flow);
> > +        free(packet);
> > +    }
> > +}
> > +
> >  /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
> >   * there can be multiple poll loops, 'handler_id' is needed as index to
> >   * identify the corresponding poll loop.  If successful, stores the
> upcall
> > @@ -1360,25 +1414,15 @@ int
> >  dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall
> *upcall,
> >            struct ofpbuf *buf)
> >  {
> > -    int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
> > -    if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
> > -        struct ds flow;
> > -        char *packet;
> > +    int error = EAGAIN;
> >
> > -        packet = ofp_packet_to_string(ofpbuf_data(&upcall->packet),
> > -                                      ofpbuf_size(&upcall->packet));
> > -
> > -        ds_init(&flow);
> > -        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
> > -
> > -        VLOG_DBG("%s: %s upcall:\n%s\n%s",
> > -                 dpif_name(dpif),
> dpif_upcall_type_to_string(upcall->type),
> > -                 ds_cstr(&flow), packet);
> > -
> > -        ds_destroy(&flow);
> > -        free(packet);
> > -    } else if (error && error != EAGAIN) {
> > -        log_operation(dpif, "recv", error);
> > +    if (dpif->dpif_class->recv) {
> > +        error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
> > +        if (!error) {
> > +            dpif_print_packet(dpif, upcall);
> > +        } else if (error != EAGAIN) {
> > +            log_operation(dpif, "recv", error);
> > +        }
> >      }
> >      return error;
> >  }
> > @@ -1401,7 +1445,9 @@ dpif_recv_purge(struct dpif *dpif)
> >  void
> >  dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
> >  {
> > -    dpif->dpif_class->recv_wait(dpif, handler_id);
> > +    if (dpif->dpif_class->recv_wait) {
> > +        dpif->dpif_class->recv_wait(dpif, handler_id);
> > +    }
> >  }
> >
> >  /* Obtains the NetFlow engine type and engine ID for 'dpif' into
> '*engine_type'
> > diff --git a/lib/dpif.h b/lib/dpif.h
> > index 94bcacc..8d8e43a 100644
> > --- a/lib/dpif.h
> > +++ b/lib/dpif.h
> > @@ -671,12 +671,20 @@ struct dpif_upcall {
> >      struct nlattr *userdata;    /* Argument to
> OVS_ACTION_ATTR_USERSPACE. */
> >  };
> >
> > +typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *,
> > +                            struct ofpbuf *, int cnt);
> > +
> >  int dpif_recv_set(struct dpif *, bool enable);
> >  int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
> >  int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
> >                struct ofpbuf *);
> >  void dpif_recv_purge(struct dpif *);
> >  void dpif_recv_wait(struct dpif *, uint32_t handler_id);
> > +void dpif_register_upcall_cb(struct dpif *, exec_upcall_cb *);
> > +void dpif_enable_upcall(struct dpif *);
> > +void dpif_disable_upcall(struct dpif *);
> > +
> > +void dpif_print_packet(struct dpif *, struct dpif_upcall *);
> >
> >  /* Miscellaneous. */
> >
> > diff --git a/ofproto/ofproto-dpif-upcall.c
> b/ofproto/ofproto-dpif-upcall.c
> > index df33643..51433e3 100644
> > --- a/ofproto/ofproto-dpif-upcall.c
> > +++ b/ofproto/ofproto-dpif-upcall.c
> > @@ -201,7 +201,9 @@ static struct list all_udpifs =
> LIST_INITIALIZER(&all_udpifs);
> >
> >  static size_t read_upcalls(struct handler *,
> >                             struct upcall upcalls[UPCALL_MAX_BATCH]);
> > -static void handle_upcalls(struct handler *, struct upcall *, size_t
> n_upcalls);
> > +static void free_upcall(struct upcall *);
> > +static int convert_upcall(struct udpif *, struct upcall *);
> > +static void handle_upcalls(struct udpif *, struct upcall *, size_t
> n_upcalls);
> >  static void udpif_stop_threads(struct udpif *);
> >  static void udpif_start_threads(struct udpif *, size_t n_handlers,
> >                                  size_t n_revalidators);
> > @@ -266,6 +268,8 @@ udpif_create(struct dpif_backer *backer, struct dpif
> *dpif)
> >      atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
> >      ovs_mutex_init(&udpif->n_flows_mutex);
> >
> > +    dpif_register_upcall_cb(dpif, exec_upcalls);
> > +
> >      return udpif;
> >  }
> >
> > @@ -317,6 +321,8 @@ udpif_stop_threads(struct udpif *udpif)
> >              xpthread_join(udpif->revalidators[i].thread, NULL);
> >          }
> >
> > +        dpif_disable_upcall(udpif->dpif);
> > +
> >          for (i = 0; i < udpif->n_revalidators; i++) {
> >              struct revalidator *revalidator = &udpif->revalidators[i];
> >
> > @@ -367,6 +373,8 @@ udpif_start_threads(struct udpif *udpif, size_t
> n_handlers,
> >                  "handler", udpif_upcall_handler, handler);
> >          }
> >
> > +        dpif_enable_upcall(udpif->dpif);
> > +
> >          ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
> >          udpif->reval_exit = false;
> >          udpif->revalidators = xzalloc(udpif->n_revalidators
> > @@ -539,12 +547,10 @@ udpif_upcall_handler(void *arg)
> >              latch_wait(&udpif->exit_latch);
> >              poll_block();
> >          } else {
> > -            handle_upcalls(handler, upcalls, n_upcalls);
> > +            handle_upcalls(handler->udpif, upcalls, n_upcalls);
> >
> >              for (i = 0; i < n_upcalls; i++) {
> > -                xlate_out_uninit(&upcalls[i].xout);
> > -                ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
> > -                ofpbuf_uninit(&upcalls[i].upcall_buf);
> > +                free_upcall(&upcalls[i]);
> >              }
> >          }
> >          coverage_clear();
> > @@ -751,6 +757,63 @@ upcall_init(struct upcall *upcall, struct flow
> *flow, struct ofpbuf *packet,
> >      xlate_actions(&xin, &upcall->xout);
> >  }
> >
> > +void
> > +free_upcall(struct upcall *upcall)
> > +{
> > +    xlate_out_uninit(&upcall->xout);
> > +    ofpbuf_uninit(&upcall->dpif_upcall.packet);
> > +    ofpbuf_uninit(&upcall->upcall_buf);
> > +}
> > +
> > +static struct udpif *
> > +find_udpif(struct dpif *dpif)
> > +{
> > +    struct udpif *udpif;
> > +
> > +    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
> > +        if (udpif->dpif == dpif) {
> > +            return udpif;
> > +        }
> > +    }
> > +    return NULL;
> > +}
> > +
> > +void
> > +exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls,
> > +             struct ofpbuf *bufs, int cnt)
> > +{
> > +    struct upcall upcalls[UPCALL_MAX_BATCH];
> > +    size_t n_upcalls = 0;
> > +    struct udpif *udpif;
> > +    int i;
> > +
> > +    ovs_assert(cnt <= UPCALL_MAX_BATCH);
> > +
> > +    udpif = find_udpif(dpif);
> > +    ovs_assert(udpif);
> > +
> > +    for (i = 0; i < cnt; i++) {
> > +        struct upcall *upcall = &upcalls[n_upcalls];
> > +        struct dpif_upcall *dupcall = &dupcalls[i];
> > +        struct ofpbuf *buf = &bufs[i];
> > +
> > +        upcall->dpif_upcall = *dupcall;
> > +        upcall->upcall_buf = *buf;
> > +
> > +        dpif_print_packet(dpif, dupcall);
> > +        if (!convert_upcall(udpif, upcall)) {
> > +            n_upcalls += 1;
> > +        }
> > +    }
> > +
> > +    if (n_upcalls) {
> > +        handle_upcalls(udpif, upcalls, n_upcalls);
> > +        for (i = 0; i < n_upcalls; i++) {
> > +            free_upcall(&upcalls[i]);
> > +        }
> > +    }
> > +}
> > +
> >  /* Reads and classifies upcalls.  Returns the number of upcalls
> successfully
> >   * read. */
> >  static size_t
> > @@ -764,14 +827,6 @@ read_upcalls(struct handler *handler,
> >      /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
> >      for (i = 0; i < UPCALL_MAX_BATCH; i++) {
> >          struct upcall *upcall = &upcalls[n_upcalls];
> > -        struct dpif_upcall *dupcall;
> > -        struct ofpbuf *packet;
> > -        struct ofproto_dpif *ofproto;
> > -        struct dpif_sflow *sflow;
> > -        struct dpif_ipfix *ipfix;
> > -        struct flow flow;
> > -        enum upcall_type type;
> > -        odp_port_t odp_in_port;
> >          int error;
> >
> >          ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> > @@ -783,91 +838,107 @@ read_upcalls(struct handler *handler,
> >              break;
> >          }
> >
> > -        dupcall = &upcall->dpif_upcall;
> > -        packet = &dupcall->packet;
> > -        error = xlate_receive(udpif->backer, packet, dupcall->key,
> > -                              dupcall->key_len, &flow,
> > -                              &ofproto, &ipfix, &sflow, NULL,
> &odp_in_port);
> > -        if (error) {
> > -            if (error == ENODEV) {
> > -                /* Received packet on datapath port for which we
> couldn't
> > -                 * associate an ofproto.  This can happen if a port is
> removed
> > -                 * while traffic is being received.  Print a
> rate-limited
> > -                 * message in case it happens frequently.  Install a
> drop flow
> > -                 * so that future packets of the flow are inexpensively
> dropped
> > -                 * in the kernel. */
> > -                VLOG_INFO_RL(&rl, "received packet on unassociated
> datapath "
> > -                             "port %"PRIu32, odp_in_port);
> > -                dpif_flow_put(udpif->dpif, DPIF_FP_CREATE |
> DPIF_FP_MODIFY,
> > -                              dupcall->key, dupcall->key_len, NULL, 0,
> NULL, 0,
> > -                              NULL);
> > -            }
> > -            goto destroy_upcall;
> > +        if (!convert_upcall(udpif, upcall)) {
> > +            n_upcalls += 1;
> >          }
> > +    }
> > +    return n_upcalls;
> > +}
> >
> > -        type = classify_upcall(upcall);
> > -        if (type == MISS_UPCALL) {
> > -            upcall_init(upcall, &flow, packet, ofproto, dupcall,
> odp_in_port);
> > -            n_upcalls++;
> > -            continue;
> > -        }
> > +int
> > +convert_upcall(struct udpif *udpif, struct upcall *upcall)
> > +{
> > +    struct dpif_upcall *dupcall = &upcall->dpif_upcall;
> > +    struct ofpbuf *packet = &dupcall->packet;
> > +    struct ofproto_dpif *ofproto;
> > +    struct dpif_sflow *sflow;
> > +    struct dpif_ipfix *ipfix;
> > +    struct flow flow;
> > +    enum upcall_type type;
> > +    odp_port_t odp_in_port;
> > +    int error;
> >
> > -        switch (type) {
> > -        case SFLOW_UPCALL:
> > -            if (sflow) {
> > -                union user_action_cookie cookie;
> > +    error = xlate_receive(udpif->backer, packet, dupcall->key,
> > +                          dupcall->key_len, &flow,
> > +                          &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
> >
> > -                memset(&cookie, 0, sizeof cookie);
> > -                memcpy(&cookie, nl_attr_get(dupcall->userdata),
> > -                       sizeof cookie.sflow);
> > -                dpif_sflow_received(sflow, packet, &flow, odp_in_port,
> > -                                    &cookie);
> > -            }
> > -            break;
> > -        case IPFIX_UPCALL:
> > -            if (ipfix) {
> > -                dpif_ipfix_bridge_sample(ipfix, packet, &flow);
> > -            }
> > -            break;
> > -        case FLOW_SAMPLE_UPCALL:
> > -            if (ipfix) {
> > -                union user_action_cookie cookie;
> > -
> > -                memset(&cookie, 0, sizeof cookie);
> > -                memcpy(&cookie, nl_attr_get(dupcall->userdata),
> > -                       sizeof cookie.flow_sample);
> > -
> > -                /* The flow reflects exactly the contents of the packet.
> > -                 * Sample the packet using it. */
> > -                dpif_ipfix_flow_sample(ipfix, packet, &flow,
> > -
> cookie.flow_sample.collector_set_id,
> > -                                       cookie.flow_sample.probability,
> > -                                       cookie.flow_sample.obs_domain_id,
> > -                                       cookie.flow_sample.obs_point_id);
> > -            }
> > -            break;
> > -        case BAD_UPCALL:
> > -            break;
> > -        case MISS_UPCALL:
> > -            OVS_NOT_REACHED();
> > +    if (error) {
> > +        if (error == ENODEV) {
> > +            /* Received packet on datapath port for which we couldn't
> > +             * associate an ofproto.  This can happen if a port is
> removed
> > +             * while traffic is being received.  Print a rate-limited
> > +             * message in case it happens frequently.  Install a drop
> flow
> > +             * so that future packets of the flow are inexpensively
> dropped
> > +             * in the kernel. */
> > +            VLOG_INFO_RL(&rl, "received packet on unassociated datapath
> "
> > +                         "port %"PRIu32, odp_in_port);
> > +            dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
> > +                          dupcall->key, dupcall->key_len, NULL, 0,
> NULL, 0,
> > +                          NULL);
> >          }
> > +        goto destroy_upcall;
> > +    }
> >
> > -        dpif_ipfix_unref(ipfix);
> > -        dpif_sflow_unref(sflow);
> > +    type = classify_upcall(upcall);
> > +    if (type == MISS_UPCALL) {
> > +        upcall_init(upcall, &flow, packet, ofproto, dupcall,
> odp_in_port);
> > +        return error;
> > +    }
> >
> > -destroy_upcall:
> > -        ofpbuf_uninit(&upcall->dpif_upcall.packet);
> > -        ofpbuf_uninit(&upcall->upcall_buf);
> > +    switch (type) {
> > +    case SFLOW_UPCALL:
> > +        if (sflow) {
> > +            union user_action_cookie cookie;
> > +
> > +            memset(&cookie, 0, sizeof cookie);
> > +            memcpy(&cookie, nl_attr_get(dupcall->userdata),
> > +                   sizeof cookie.sflow);
> > +            dpif_sflow_received(sflow, packet, &flow, odp_in_port,
> > +                                &cookie);
> > +        }
> > +        break;
> > +    case IPFIX_UPCALL:
> > +        if (ipfix) {
> > +            dpif_ipfix_bridge_sample(ipfix, packet, &flow);
> > +        }
> > +        break;
> > +    case FLOW_SAMPLE_UPCALL:
> > +        if (ipfix) {
> > +            union user_action_cookie cookie;
> > +
> > +            memset(&cookie, 0, sizeof cookie);
> > +            memcpy(&cookie, nl_attr_get(dupcall->userdata),
> > +                   sizeof cookie.flow_sample);
> > +
> > +            /* The flow reflects exactly the contents of the packet.
> > +             * Sample the packet using it. */
> > +            dpif_ipfix_flow_sample(ipfix, packet, &flow,
> > +                                   cookie.flow_sample.collector_set_id,
> > +                                   cookie.flow_sample.probability,
> > +                                   cookie.flow_sample.obs_domain_id,
> > +                                   cookie.flow_sample.obs_point_id);
> > +        }
> > +        break;
> > +    case BAD_UPCALL:
> > +        break;
> > +    case MISS_UPCALL:
> > +        OVS_NOT_REACHED();
> >      }
> >
> > -    return n_upcalls;
> > +    dpif_ipfix_unref(ipfix);
> > +    dpif_sflow_unref(sflow);
> > +    error = EAGAIN;
> > +
> > +destroy_upcall:
> > +    ofpbuf_uninit(&upcall->dpif_upcall.packet);
> > +    ofpbuf_uninit(&upcall->upcall_buf);
> > +    return error;
> >  }
> >
> >  static void
> > -handle_upcalls(struct handler *handler, struct upcall *upcalls,
> > +handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
> >                 size_t n_upcalls)
> >  {
> > -    struct udpif *udpif = handler->udpif;
> >      struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
> >      struct dpif_op ops[UPCALL_MAX_BATCH * 2];
> >      size_t n_ops, i;
> > diff --git a/ofproto/ofproto-dpif-upcall.h
> b/ofproto/ofproto-dpif-upcall.h
> > index 8c4b655..2b197ad 100644
> > --- a/ofproto/ofproto-dpif-upcall.h
> > +++ b/ofproto/ofproto-dpif-upcall.h
> > @@ -19,6 +19,8 @@
> >
> >  struct dpif;
> >  struct dpif_backer;
> > +struct dpif_upcall;
> > +struct ofpbuf;
> >  struct seq;
> >  struct simap;
> >
> > @@ -26,6 +28,9 @@ struct simap;
> >   * them.  Additionally, it's responsible for maintaining the datapath
> flow
> >   * table. */
> >
> > +void exec_upcalls(struct dpif *, struct dpif_upcall *, struct ofpbuf *,
> > +                  int cnt);
> > +
> >  struct udpif *udpif_create(struct dpif_backer *, struct dpif *);
> >  void udpif_run(struct udpif *udpif);
> >  void udpif_set_threads(struct udpif *, size_t n_handlers,
> > --
> > 1.7.9.5
> >
> > _______________________________________________
> > dev mailing list
> > dev@openvswitch.org
> > http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to