Looks good to me. Im not particularly worried about the races. Acked-by: Ethan Jackson <[email protected]>
On Tue, Jul 23, 2013 at 5:07 PM, Ben Pfaff <[email protected]> wrote: > This can be improved later but it is the simple thing to do for now. > > I marked a couple of races with XXX. I don't have a really good solution > for these, but I hope to find one. They may be harmless in practice. > > Signed-off-by: Ben Pfaff <[email protected]> > --- > lib/dpif-netdev.c | 203 > +++++++++++++++++++++++++++++++++++++++-------------- > 1 files changed, 150 insertions(+), 53 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index d21eb8d..8763e5c 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -139,6 +139,9 @@ struct dpif_netdev { > /* All netdev-based datapaths. */ > static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs); > > +/* Global lock for all data. */ > +static pthread_mutex_t dp_netdev_mutex = PTHREAD_MUTEX_INITIALIZER; > + > static int get_port_by_number(struct dp_netdev *, odp_port_t port_no, > struct dp_netdev_port **portp); > static int get_port_by_name(struct dp_netdev *, const char *devname, > @@ -180,9 +183,12 @@ dpif_netdev_enumerate(struct sset *all_dps) > { > struct shash_node *node; > > + xpthread_mutex_lock(&dp_netdev_mutex); > SHASH_FOR_EACH(node, &dp_netdevs) { > sset_add(all_dps, node->name); > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return 0; > } > > @@ -293,28 +299,23 @@ dpif_netdev_open(const struct dpif_class *class, const > char *name, > bool create, struct dpif **dpifp) > { > struct dp_netdev *dp; > + int error; > > + xpthread_mutex_lock(&dp_netdev_mutex); > dp = shash_find_data(&dp_netdevs, name); > if (!dp) { > - if (!create) { > - return ENODEV; > - } else { > - int error = create_dp_netdev(name, class, &dp); > - if (error) { > - return error; > - } > - ovs_assert(dp != NULL); > - } > + error = create ? create_dp_netdev(name, class, &dp) : ENODEV; > } else { > - if (dp->class != class) { > - return EINVAL; > - } else if (create) { > - return EEXIST; > - } > + error = (dp->class != class ? EINVAL > + : create ? EEXIST > + : 0); > + } > + if (!error) { > + *dpifp = create_dpif_netdev(dp); > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > > - *dpifp = create_dpif_netdev(dp); > - return 0; > + return error; > } > > static void > @@ -351,19 +352,28 @@ static void > dpif_netdev_close(struct dpif *dpif) > { > struct dp_netdev *dp = get_dp_netdev(dpif); > + > + xpthread_mutex_lock(&dp_netdev_mutex); > + > ovs_assert(dp->open_cnt > 0); > if (--dp->open_cnt == 0 && dp->destroyed) { > shash_find_and_delete(&dp_netdevs, dp->name); > dp_netdev_free(dp); > } > free(dpif); > + > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static int > dpif_netdev_destroy(struct dpif *dpif) > { > struct dp_netdev *dp = get_dp_netdev(dpif); > + > + xpthread_mutex_lock(&dp_netdev_mutex); > dp->destroyed = true; > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return 0; > } > > @@ -371,10 +381,14 @@ static int > dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) > { > struct dp_netdev *dp = get_dp_netdev(dpif); > + > + xpthread_mutex_lock(&dp_netdev_mutex); > stats->n_flows = hmap_count(&dp->flow_table); > stats->n_hit = dp->n_hit; > stats->n_missed = dp->n_missed; > stats->n_lost = dp->n_lost; > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return 0; > } > > @@ -444,32 +458,44 @@ dpif_netdev_port_add(struct dpif *dpif, struct netdev > *netdev, > char namebuf[NETDEV_VPORT_NAME_BUFSIZE]; > const char *dpif_port; > odp_port_t port_no; > + int error; > > + xpthread_mutex_lock(&dp_netdev_mutex); > dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf); > if (*port_nop != ODPP_NONE) { > uint32_t port_idx = odp_to_u32(*port_nop); > if (port_idx >= MAX_PORTS) { > - return EFBIG; > + error = EFBIG; > } else if (dp->ports[port_idx]) { > - return EBUSY; > + error = EBUSY; > + } else { > + error = 0; > + port_no = *port_nop; > } > - port_no = *port_nop; > } else { > port_no = choose_port(dp, dpif_port); > + error = port_no == ODPP_NONE ? EFBIG : 0; > } > - if (port_no != ODPP_NONE) { > + if (!error) { > *port_nop = port_no; > - return do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no); > + error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no); > } > - return EFBIG; > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > static int > dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no) > { > struct dp_netdev *dp = get_dp_netdev(dpif); > - return (port_no == ODPP_LOCAL ? > - EINVAL : do_del_port(dp, port_no)); > + int error; > + > + xpthread_mutex_lock(&dp_netdev_mutex); > + error = port_no == ODPP_LOCAL ? EINVAL : do_del_port(dp, port_no); > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > static bool > @@ -547,10 +573,13 @@ dpif_netdev_port_query_by_number(const struct dpif > *dpif, odp_port_t port_no, > struct dp_netdev_port *port; > int error; > > + xpthread_mutex_lock(&dp_netdev_mutex); > error = get_port_by_number(dp, port_no, &port); > if (!error && dpif_port) { > answer_port_query(port, dpif_port); > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return error; > } > > @@ -562,10 +591,13 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, > const char *devname, > struct dp_netdev_port *port; > int error; > > + xpthread_mutex_lock(&dp_netdev_mutex); > error = get_port_by_name(dp, devname, &port); > if (!error && dpif_port) { > answer_port_query(port, dpif_port); > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return error; > } > > @@ -597,7 +629,11 @@ static int > dpif_netdev_flow_flush(struct dpif *dpif) > { > struct dp_netdev *dp = get_dp_netdev(dpif); > + > + xpthread_mutex_lock(&dp_netdev_mutex); > dp_netdev_flow_flush(dp); > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return 0; > } > > @@ -621,6 +657,7 @@ dpif_netdev_port_dump_next(const struct dpif *dpif, void > *state_, > struct dp_netdev *dp = get_dp_netdev(dpif); > uint32_t port_idx; > > + xpthread_mutex_lock(&dp_netdev_mutex); > for (port_idx = odp_to_u32(state->port_no); > port_idx < MAX_PORTS; port_idx++) { > struct dp_netdev_port *port = dp->ports[port_idx]; > @@ -631,9 +668,13 @@ dpif_netdev_port_dump_next(const struct dpif *dpif, void > *state_, > dpif_port->type = port->type; > dpif_port->port_no = port->port_no; > state->port_no = u32_to_odp(port_idx + 1); > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return 0; > } > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > return EOF; > } > > @@ -650,21 +691,34 @@ static int > dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED) > { > struct dpif_netdev *dpif = dpif_netdev_cast(dpif_); > + int error; > + > + xpthread_mutex_lock(&dp_netdev_mutex); > if (dpif->dp_serial != dpif->dp->serial) { > dpif->dp_serial = dpif->dp->serial; > - return ENOBUFS; > + error = ENOBUFS; > } else { > - return EAGAIN; > + error = EAGAIN; > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > static void > dpif_netdev_port_poll_wait(const struct dpif *dpif_) > { > struct dpif_netdev *dpif = dpif_netdev_cast(dpif_); > + > + /* XXX In a multithreaded process, there is a race window between this > + * function and the poll_block() in one thread and a change in > + * dpif->dp->serial in another thread. */ > + > + xpthread_mutex_lock(&dp_netdev_mutex); > if (dpif->dp_serial != dpif->dp->serial) { > poll_immediate_wake(); > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static struct dp_netdev_flow * > @@ -737,18 +791,21 @@ dpif_netdev_flow_get(const struct dpif *dpif, > return error; > } > > + xpthread_mutex_lock(&dp_netdev_mutex); > flow = dp_netdev_lookup_flow(dp, &key); > - if (!flow) { > - return ENOENT; > + if (flow) { > + if (stats) { > + get_dpif_flow_stats(flow, stats); > + } > + if (actionsp) { > + *actionsp = ofpbuf_clone_data(flow->actions, flow->actions_len); > + } > + } else { > + error = ENOENT; > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > > - if (stats) { > - get_dpif_flow_stats(flow, stats); > - } > - if (actionsp) { > - *actionsp = ofpbuf_clone_data(flow->actions, flow->actions_len); > - } > - return 0; > + return error; > } > > static int > @@ -803,6 +860,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct > dpif_flow_put *put) > return error; > } > > + xpthread_mutex_lock(&dp_netdev_mutex); > flow = dp_netdev_lookup_flow(dp, &key); > if (!flow) { > if (put->flags & DPIF_FP_CREATE) { > @@ -810,17 +868,17 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct > dpif_flow_put *put) > if (put->stats) { > memset(put->stats, 0, sizeof *put->stats); > } > - return dp_netdev_flow_add(dp, &key, put->actions, > - put->actions_len); > + error = dp_netdev_flow_add(dp, &key, put->actions, > + put->actions_len); > } else { > - return EFBIG; > + error = EFBIG; > } > } else { > - return ENOENT; > + error = ENOENT; > } > } else { > if (put->flags & DPIF_FP_MODIFY) { > - int error = set_flow_actions(flow, put->actions, > put->actions_len); > + error = set_flow_actions(flow, put->actions, put->actions_len); > if (!error) { > if (put->stats) { > get_dpif_flow_stats(flow, put->stats); > @@ -829,11 +887,13 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct > dpif_flow_put *put) > clear_stats(flow); > } > } > - return error; > } else { > - return EEXIST; > + error = EEXIST; > } > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > static int > @@ -849,16 +909,19 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct > dpif_flow_del *del) > return error; > } > > + xpthread_mutex_lock(&dp_netdev_mutex); > flow = dp_netdev_lookup_flow(dp, &key); > if (flow) { > if (del->stats) { > get_dpif_flow_stats(flow, del->stats); > } > dp_netdev_free_flow(dp, flow); > - return 0; > } else { > - return ENOENT; > + error = ENOENT; > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > struct dp_netdev_flow_state { > @@ -893,8 +956,10 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void > *state_, > struct dp_netdev_flow *flow; > struct hmap_node *node; > > + xpthread_mutex_lock(&dp_netdev_mutex); > node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset); > if (!node) { > + xpthread_mutex_unlock(&dp_netdev_mutex); > return EOF; > } > > @@ -928,6 +993,7 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void > *state_, > *stats = &state->stats; > } > > + xpthread_mutex_unlock(&dp_netdev_mutex); > return 0; > } > > @@ -963,8 +1029,10 @@ dpif_netdev_execute(struct dpif *dpif, const struct > dpif_execute *execute) > error = dpif_netdev_flow_from_nlattrs(execute->key, execute->key_len, > &key); > if (!error) { > + xpthread_mutex_lock(&dp_netdev_mutex); > dp_netdev_execute_actions(dp, ©, &key, > execute->actions, execute->actions_len); > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > ofpbuf_uninit(©); > @@ -1004,7 +1072,11 @@ static int > dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, > struct ofpbuf *buf) > { > - struct dp_netdev_queue *q = find_nonempty_queue(dpif); > + struct dp_netdev_queue *q; > + int error; > + > + xpthread_mutex_lock(&dp_netdev_mutex); > + q = find_nonempty_queue(dpif); > if (q) { > struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; > > @@ -1014,28 +1086,36 @@ dpif_netdev_recv(struct dpif *dpif, struct > dpif_upcall *upcall, > ofpbuf_uninit(buf); > *buf = u->buf; > > - return 0; > + error = 0; > } else { > - return EAGAIN; > + error = EAGAIN; > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > + > + return error; > } > > static void > dpif_netdev_recv_wait(struct dpif *dpif) > { > + /* XXX In a multithreaded process, there is a race window between this > + * function and the poll_block() in one thread and a packet being queued > in > + * another thread. */ > + > + xpthread_mutex_lock(&dp_netdev_mutex); > if (find_nonempty_queue(dpif)) { > poll_immediate_wake(); > - } else { > - /* No messages ready to be received, and dp_wait() will ensure that > we > - * wake up to queue new messages, so there is nothing to do. */ > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static void > dpif_netdev_recv_purge(struct dpif *dpif) > { > struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); > + xpthread_mutex_lock(&dp_netdev_mutex); > dp_netdev_purge_queues(dpif_netdev->dp); > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static void > @@ -1076,10 +1156,12 @@ dp_netdev_port_input(struct dp_netdev *dp, struct > dp_netdev_port *port, > static void > dpif_netdev_run(struct dpif *dpif) > { > - struct dp_netdev *dp = get_dp_netdev(dpif); > struct dp_netdev_port *port; > + struct dp_netdev *dp; > struct ofpbuf packet; > > + xpthread_mutex_lock(&dp_netdev_mutex); > + dp = get_dp_netdev(dpif); > ofpbuf_init(&packet, > DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + dp->max_mtu); > > @@ -1101,19 +1183,34 @@ dpif_netdev_run(struct dpif *dpif) > } > } > ofpbuf_uninit(&packet); > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static void > dpif_netdev_wait(struct dpif *dpif) > { > - struct dp_netdev *dp = get_dp_netdev(dpif); > struct dp_netdev_port *port; > > - LIST_FOR_EACH (port, node, &dp->port_list) { > + /* There is a race here, if thread A calls dpif_netdev_wait(dpif) and > + * thread B calls dpif_port_add(dpif) or dpif_port_remove(dpif) before > + * A makes it to poll_block(). > + * > + * But I think it doesn't matter: > + * > + * - In the dpif_port_add() case, A will not wake up when a packet > + * arrives on the new port, but this would also happen if the > + * ordering were reversed. > + * > + * - In the dpif_port_remove() case, A might wake up spuriously, but > + * that is harmless. */ > + > + xpthread_mutex_lock(&dp_netdev_mutex); > + LIST_FOR_EACH (port, node, &get_dp_netdev(dpif)->port_list) { > if (port->rx) { > netdev_rx_wait(port->rx); > } > } > + xpthread_mutex_unlock(&dp_netdev_mutex); > } > > static void > -- > 1.7.2.5 > > _______________________________________________ > dev mailing list > [email protected] > http://openvswitch.org/mailman/listinfo/dev X-CudaMail-Whitelist-To: [email protected] _______________________________________________ dev mailing list [email protected] http://openvswitch.org/mailman/listinfo/dev
