On Fri, Jun 13, 2014 at 5:00 PM, Daniele Di Proietto
<[email protected]> wrote:
> The netdev_send function has been modified to accept multiple packets, to
> allow netdev providers to amortize locking and queuing costs.
> This is especially true for netdev-dpdk.
>
> Later commits exploit the new API.
>
> Signed-off-by: Daniele Di Proietto <[email protected]>
> ---
> lib/dpif-netdev.c | 4 +-
> lib/netdev-bsd.c | 56 ++++++++++--------
> lib/netdev-dpdk.c | 160
> ++++++++++++++++++++++++++++++++------------------
> lib/netdev-dummy.c | 69 +++++++++++++---------
> lib/netdev-linux.c | 54 ++++++++++-------
> lib/netdev-provider.h | 18 +++---
> lib/netdev.c | 17 ++++--
> lib/netdev.h | 3 +-
> 8 files changed, 236 insertions(+), 145 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 332bbda..86e36bc 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -2123,7 +2123,9 @@ dp_execute_cb(void *aux_, struct dpif_packet *packet,
> case OVS_ACTION_ATTR_OUTPUT:
> p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
> if (p) {
> - netdev_send(p->netdev, &packet->ofpbuf, may_steal);
> + struct ofpbuf * ofp = &packet->ofpbuf;
> +
Extra space.
> + netdev_send(p->netdev, &ofp, 1, may_steal);
> }
> break;
>
> diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> index 27d90f0..b53de0d 100644
> --- a/lib/netdev-bsd.c
> +++ b/lib/netdev-bsd.c
> @@ -685,13 +685,13 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)
> * system or a tap device.
> */
> static int
> -netdev_bsd_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal)
> +netdev_bsd_send(struct netdev *netdev_, struct ofpbuf **pkts, int cnt,
> + bool may_steal)
> {
> struct netdev_bsd *dev = netdev_bsd_cast(netdev_);
> const char *name = netdev_get_name(netdev_);
> - const void *data = ofpbuf_data(pkt);
> - size_t size = ofpbuf_size(pkt);
> int error;
> + int i;
>
> ovs_mutex_lock(&dev->mutex);
> if (dev->tap_fd < 0 && !dev->pcap) {
> @@ -700,35 +700,43 @@ netdev_bsd_send(struct netdev *netdev_, struct ofpbuf
> *pkt, bool may_steal)
> error = 0;
> }
>
> - while (!error) {
> - ssize_t retval;
> - if (dev->tap_fd >= 0) {
> - retval = write(dev->tap_fd, data, size);
> - } else {
> - retval = pcap_inject(dev->pcap, data, size);
> - }
> - if (retval < 0) {
> - if (errno == EINTR) {
> - continue;
> + for (i = 0; i < cnt; i++) {
> + const void *data = ofpbuf_data(pkts[i]);
> + size_t size = ofpbuf_size(pkts[i]);
> +
> + while (!error) {
> + ssize_t retval;
> + if (dev->tap_fd >= 0) {
> + retval = write(dev->tap_fd, data, size);
> } else {
> - error = errno;
> - if (error != EAGAIN) {
> - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: "
> - "%s", name, ovs_strerror(error));
> + retval = pcap_inject(dev->pcap, data, size);
> + }
> + if (retval < 0) {
> + if (errno == EINTR) {
> + continue;
> + } else {
> + error = errno;
> + if (error != EAGAIN) {
> + VLOG_WARN_RL(&rl, "error sending Ethernet packet on"
> + " %s: %s", name, ovs_strerror(error));
> + }
> }
> + } else if (retval != size) {
> + VLOG_WARN_RL(&rl, "sent partial Ethernet packet "
> + "(%"PRIuSIZE" bytes of "
> + "%"PRIuSIZE") on %s", retval, size, name);
> + error = EMSGSIZE;
> + } else {
> + break;
> }
> - } else if (retval != size) {
> - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE"
> bytes of "
> - "%"PRIuSIZE") on %s", retval, size, name);
> - error = EMSGSIZE;
> - } else {
> - break;
> }
> }
>
> ovs_mutex_unlock(&dev->mutex);
> if (may_steal) {
> - ofpbuf_delete(pkt);
> + for (i = 0; i < cnt; i++) {
> + ofpbuf_delete(pkt);
> + }
> }
>
> return error;
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index ec6565a..74c8e57 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -611,103 +611,149 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct
> dpif_packet **packets,
> }
>
> inline static void
> -dpdk_queue_pkt(struct netdev_dpdk *dev, int qid,
> - struct rte_mbuf *pkt)
> +dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
> + struct rte_mbuf **pkts, int cnt)
> {
> struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> uint64_t diff_tsc;
> uint64_t cur_tsc;
> uint32_t nb_tx;
>
> + int i = 0;
> +
> rte_spinlock_lock(&txq->tx_lock);
> - txq->burst_pkts[txq->count++] = pkt;
> - if (txq->count == MAX_TX_QUEUE_LEN) {
> - goto flush;
> - }
> - cur_tsc = rte_get_timer_cycles();
> - if (txq->count == 1) {
> - txq->tsc = cur_tsc;
> - }
> - diff_tsc = cur_tsc - txq->tsc;
> - if (diff_tsc >= DRAIN_TSC) {
> - goto flush;
> - }
> - rte_spinlock_unlock(&txq->tx_lock);
> - return;
> + while (i < cnt) {
> + int freeslots = MAX_TX_QUEUE_LEN - txq->count;
> + int tocopy = MIN(freeslots, cnt-i);
>
> -flush:
> - nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
> - if (nb_tx != txq->count) {
> - /* free buffers if we couldn't transmit packets */
> - rte_mempool_put_bulk(dev->dpdk_mp->mp,
> - (void **) &txq->burst_pkts[nb_tx],
> - (txq->count - nb_tx));
> + memcpy(&txq->burst_pkts[txq->count], &pkts[i],
> + tocopy * sizeof (struct rte_mbuf *));
> +
> + txq->count += tocopy;
> + i += tocopy;
> +
> + if (txq->count == MAX_TX_QUEUE_LEN) {
> + goto flush;
> + }
> + cur_tsc = rte_get_timer_cycles();
> + if (txq->count == 1) {
> + txq->tsc = cur_tsc;
> + }
> + diff_tsc = cur_tsc - txq->tsc;
> + if (diff_tsc >= DRAIN_TSC) {
> + goto flush;
> + }
> + continue;
> +
> + flush:
> + nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
> + txq->count);
> + if (nb_tx != txq->count) {
> + /* free buffers if we couldn't transmit packets */
> + rte_mempool_put_bulk(dev->dpdk_mp->mp,
> + (void **) &txq->burst_pkts[nb_tx],
> + (txq->count - nb_tx));
> + }
> + txq->count = 0;
> }
> - txq->count = 0;
> rte_spinlock_unlock(&txq->tx_lock);
> }
>
> /* Tx function. Transmit packets indefinitely */
> static void
> -dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size)
> +dpdk_do_tx_copy(struct netdev *netdev, struct ofpbuf ** ofpbufs, int cnt)
> {
> struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> - struct rte_mbuf *pkt;
> + struct rte_mbuf *pkts[cnt];
> + int i;
>
> - pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
> - if (!pkt) {
> - ovs_mutex_lock(&dev->mutex);
> - dev->stats.tx_dropped++;
> - ovs_mutex_unlock(&dev->mutex);
> - return;
> - }
> + for (i = 0; i < cnt; i++) {
> + int size = ofpbuf_size(ofpbufs[i]);
> + if (size > dev->max_packet_len) {
> + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
> + (int)size , dev->max_packet_len);
>
> - /* We have to do a copy for now */
> - memcpy(pkt->pkt.data, buf, size);
> + ovs_mutex_lock(&dev->mutex);
> + dev->stats.tx_dropped++;
> + ovs_mutex_unlock(&dev->mutex);
>
> - rte_pktmbuf_data_len(pkt) = size;
> - rte_pktmbuf_pkt_len(pkt) = size;
> + continue;
> + }
> +
> + pkts[i] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
>
> - dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt);
> + if (!pkts[i]) {
> + ovs_mutex_lock(&dev->mutex);
> + dev->stats.tx_dropped++;
> + ovs_mutex_unlock(&dev->mutex);
> + return;
> + }
> +
> + /* We have to do a copy for now */
> + memcpy(pkts[i]->pkt.data, ofpbuf_data(ofpbufs[i]), size);
> +
> + rte_pktmbuf_data_len(pkts[i]) = size;
> + rte_pktmbuf_pkt_len(pkts[i]) = size;
> + }
> +
> + dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, pkts, cnt);
> dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
> }
>
> static int
> -netdev_dpdk_send(struct netdev *netdev,
> - struct ofpbuf *ofpbuf, bool may_steal)
> +netdev_dpdk_send(struct netdev *netdev, struct ofpbuf **ofpbufs, int cnt,
> + bool may_steal)
> {
> struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> int ret;
> + int i;
>
> - if (ofpbuf_size(ofpbuf) > dev->max_packet_len) {
> - VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
> - (int)ofpbuf_size(ofpbuf) , dev->max_packet_len);
> -
> - ovs_mutex_lock(&dev->mutex);
> - dev->stats.tx_dropped++;
> - ovs_mutex_unlock(&dev->mutex);
> -
> - ret = E2BIG;
> - goto out;
> - }
> -
> - if (!may_steal || ofpbuf->source != OFPBUF_DPDK) {
> - dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf),
> ofpbuf_size(ofpbuf));
> + if (!may_steal || ofpbufs[0]->source != OFPBUF_DPDK) {
> + dpdk_do_tx_copy(netdev, ofpbufs, cnt);
>
> if (may_steal) {
> - ofpbuf_delete(ofpbuf);
> + for (i = 0; i < cnt; i++) {
> + ofpbuf_delete(ofpbufs[i]);
> + }
> }
> } else {
> int qid;
> + int next_tx_idx = 0;
> + int dropped = 0;
>
> qid = rte_lcore_id() % NR_QUEUE;
>
> - dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf);
> + for (i = 0; i < cnt; i++) {
> + int size = ofpbuf_size(ofpbufs[i]);
> + if (OVS_UNLIKELY(size > dev->max_packet_len)) {
> + if (next_tx_idx != i) {
> + dpdk_queue_pkts(dev, qid,
> + (struct rte_mbuf
> **)&ofpbufs[next_tx_idx],
> + i-next_tx_idx);
> +
> + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
> + (int)size , dev->max_packet_len);
> +
> + ofpbuf_delete(ofpbufs[i]);
> + dropped++;
> + }
> + next_tx_idx = i + 1;
> + }
> + }
> + if (next_tx_idx != cnt) {
> + dpdk_queue_pkts(dev, qid,
> + (struct rte_mbuf **)&ofpbufs[next_tx_idx],
> + cnt-next_tx_idx);
> + }
>
> + if (OVS_UNLIKELY(dropped)) {
> + ovs_mutex_lock(&dev->mutex);
> + dev->stats.tx_dropped += dropped;
> + ovs_mutex_unlock(&dev->mutex);
> + }
> }
> ret = 0;
>
> -out:
> return ret;
> }
>
> diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> index 1464d29..ca82765 100644
> --- a/lib/netdev-dummy.c
> +++ b/lib/netdev-dummy.c
> @@ -845,50 +845,61 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)
> }
>
> static int
> -netdev_dummy_send(struct netdev *netdev, struct ofpbuf *pkt, bool may_steal)
> +netdev_dummy_send(struct netdev *netdev, struct ofpbuf **pkts, int cnt,
> + bool may_steal)
> {
> struct netdev_dummy *dev = netdev_dummy_cast(netdev);
> - const void *buffer = ofpbuf_data(pkt);
> - size_t size = ofpbuf_size(pkt);
> + int error = 0;
> + int i;
>
> - if (size < ETH_HEADER_LEN) {
> - return EMSGSIZE;
> - } else {
> - const struct eth_header *eth = buffer;
> - int max_size;
> + for (i = 0; i < cnt; i++) {
> + const void *buffer = ofpbuf_data(pkts[i]);
> + size_t size = ofpbuf_size(pkts[i]);
>
> - ovs_mutex_lock(&dev->mutex);
> - max_size = dev->mtu + ETH_HEADER_LEN;
> - ovs_mutex_unlock(&dev->mutex);
> + if (size < ETH_HEADER_LEN) {
> + error = EMSGSIZE;
> + break;
> + } else {
> + const struct eth_header *eth = buffer;
> + int max_size;
>
> - if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
> - max_size += VLAN_HEADER_LEN;
> - }
> - if (size > max_size) {
> - return EMSGSIZE;
> + ovs_mutex_lock(&dev->mutex);
> + max_size = dev->mtu + ETH_HEADER_LEN;
> + ovs_mutex_unlock(&dev->mutex);
> +
> + if (eth->eth_type == htons(ETH_TYPE_VLAN)) {
> + max_size += VLAN_HEADER_LEN;
> + }
> + if (size > max_size) {
> + error = EMSGSIZE;
> + break;
> + }
> }
> - }
>
> - ovs_mutex_lock(&dev->mutex);
> - dev->stats.tx_packets++;
> - dev->stats.tx_bytes += size;
> + ovs_mutex_lock(&dev->mutex);
> + dev->stats.tx_packets++;
> + dev->stats.tx_bytes += size;
> +
> + dummy_packet_conn_send(&dev->conn, buffer, size);
>
> - dummy_packet_conn_send(&dev->conn, buffer, size);
> + if (dev->tx_pcap) {
> + struct ofpbuf packet;
>
> - if (dev->tx_pcap) {
> - struct ofpbuf packet;
> + ofpbuf_use_const(&packet, buffer, size);
> + ovs_pcap_write(dev->tx_pcap, &packet);
> + fflush(dev->tx_pcap);
> + }
>
> - ofpbuf_use_const(&packet, buffer, size);
> - ovs_pcap_write(dev->tx_pcap, &packet);
> - fflush(dev->tx_pcap);
> + ovs_mutex_unlock(&dev->mutex);
> }
>
> - ovs_mutex_unlock(&dev->mutex);
> if (may_steal) {
> - ofpbuf_delete(pkt);
> + for (i = 0; i < cnt; i++) {
> + ofpbuf_delete(pkts[i]);
> + }
> }
>
> - return 0;
> + return error;
> }
>
> static int
> diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> index 074a061..21b6ea4 100644
> --- a/lib/netdev-linux.c
> +++ b/lib/netdev-linux.c
> @@ -1057,12 +1057,16 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
> * The kernel maintains a packet transmission queue, so the caller is not
> * expected to do additional queuing of packets. */
> static int
> -netdev_linux_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal)
> +netdev_linux_send(struct netdev *netdev_, struct ofpbuf **pkts, int cnt,
> + bool may_steal)
> {
> - const void *data = ofpbuf_data(pkt);
> - size_t size = ofpbuf_size(pkt);
> + int i;
> + int error = 0;
>
> - for (;;) {
> + /* 'i' is incremented only if there's no error */
> + for (i = 0; i < cnt;) {
> + const void *data = ofpbuf_data(pkts[i]);
> + size_t size = ofpbuf_size(pkts[i]);
> ssize_t retval;
>
> if (!is_tap_netdev(netdev_)) {
> @@ -1112,31 +1116,41 @@ netdev_linux_send(struct netdev *netdev_, struct
> ofpbuf *pkt, bool may_steal)
> retval = write(netdev->tap_fd, data, size);
> }
>
> - if (may_steal) {
> - ofpbuf_delete(pkt);
> - }
> -
> if (retval < 0) {
> /* The Linux AF_PACKET implementation never blocks waiting for
> room
> * for packets, instead returning ENOBUFS. Translate this into
> * EAGAIN for the caller. */
> - if (errno == ENOBUFS) {
> - return EAGAIN;
> - } else if (errno == EINTR) {
> + error = errno == ENOBUFS ? EAGAIN : errno;
> + if (error == EINTR) {
> + /* continue without incrementing 'i', i.e. retry this packet
> */
> continue;
> - } else if (errno != EAGAIN) {
> - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
> - netdev_get_name(netdev_), ovs_strerror(errno));
> }
> - return errno;
> + break;
> } else if (retval != size) {
> - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE"
> bytes of "
> - "%"PRIuSIZE") on %s", retval, size,
> netdev_get_name(netdev_));
> - return EMSGSIZE;
> - } else {
> - return 0;
> + VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE"
> bytes"
> + " of %"PRIuSIZE") on %s", retval, size,
> + netdev_get_name(netdev_));
> + error = EMSGSIZE;
> + break;
> }
> +
> + /* Process the next packet in the batch */
> + i++;
> }
> +
> + if (may_steal) {
> + for (i = 0; i < cnt; i++) {
> + ofpbuf_delete(pkts[i]);
> + }
> + }
> +
> + if (error && error != EAGAIN) {
> + VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s",
> + netdev_get_name(netdev_), ovs_strerror(error));
> + }
> +
> + return error;
> +
> }
>
> /* Registers with the poll loop to wake up from the next call to poll_block()
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index 42c0012..33d5173 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -250,13 +250,16 @@ struct netdev_class {
> const struct netdev_tunnel_config *
> (*get_tunnel_config)(const struct netdev *netdev);
>
> - /* Sends the buffer on 'netdev'.
> - * Returns 0 if successful, otherwise a positive errno value. Returns
> - * EAGAIN without blocking if the packet cannot be queued immediately.
> - * Returns EMSGSIZE if a partial packet was transmitted or if the packet
> - * is too big or too small to transmit on the device.
> + /* Sends buffers on 'netdev'.
> + * Returns 0 if successful (for every buffer), otherwise a positive
> errno value.
> + * Returns EAGAIN without blocking if one or more packets cannot be
> + * queued immediately. Returns EMSGSIZE if a partial packet was
> transmitted
> + * or if a packet is too big or too small to transmit on the device.
> *
> - * To retain ownership of 'buffer' caller can set may_steal to false.
> + * If the function returns a non-zero value, some of the packets might
> have
> + * been sent anyway.
> + *
> + * To retain ownership of 'buffers' caller can set may_steal to false.
> *
> * The network device is expected to maintain a packet transmission
> queue,
> * so that the caller does not ordinarily have to do additional queuing
> of
> @@ -268,7 +271,8 @@ struct netdev_class {
> * network device from being usefully used by the netdev-based "userspace
> * datapath". It will also prevent the OVS implementation of bonding
> from
> * working properly over 'netdev'.) */
> - int (*send)(struct netdev *netdev, struct ofpbuf *buffer, bool
> may_steal);
> + int (*send)(struct netdev *netdev, struct ofpbuf **buffers, int cnt,
> + bool may_steal);
>
> /* Registers with the poll loop to wake up from the next call to
> * poll_block() when the packet transmission queue for 'netdev' has
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 6a2ad51..ea4405e 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -650,10 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx)
> : 0);
> }
>
> -/* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive
> - * errno value. Returns EAGAIN without blocking if the packet cannot be
> queued
> - * immediately. Returns EMSGSIZE if a partial packet was transmitted or if
> - * the packet is too big or too small to transmit on the device.
> +/* Sends 'buffers' on 'netdev'. Returns 0 if successful (for every packet),
> + * otherwise a positive errno value. Returns EAGAIN without blocking if
> + * at least one the packets cannot be queued immediately. Returns EMSGSIZE
> + * if a partial packet was transmitted or if a packet is too big or too small
> + * to transmit on the device.
> + *
> + * If the function returns a non-zero value, some of the packets might have
> + * been sent anyway.
> *
> * To retain ownership of 'buffer' caller can set may_steal to false.
> *
> @@ -663,12 +667,13 @@ netdev_rxq_drain(struct netdev_rxq *rx)
> * Some network devices may not implement support for this function. In such
> * cases this function will always return EOPNOTSUPP. */
> int
> -netdev_send(struct netdev *netdev, struct ofpbuf *buffer, bool may_steal)
> +netdev_send(struct netdev *netdev, struct ofpbuf **buffers, int cnt,
> + bool may_steal)
> {
> int error;
>
> error = (netdev->netdev_class->send
> - ? netdev->netdev_class->send(netdev, buffer, may_steal)
> + ? netdev->netdev_class->send(netdev, buffers, cnt, may_steal)
> : EOPNOTSUPP);
> if (!error) {
> COVERAGE_INC(netdev_sent);
> diff --git a/lib/netdev.h b/lib/netdev.h
> index c8880a4..4e9f96c 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -173,7 +173,8 @@ void netdev_rxq_wait(struct netdev_rxq *);
> int netdev_rxq_drain(struct netdev_rxq *);
>
> /* Packet transmission. */
> -int netdev_send(struct netdev *, struct ofpbuf *, bool may_steal);
> +int netdev_send(struct netdev *, struct ofpbuf **buffers, int cnt,
> + bool may_steal);
> void netdev_send_wait(struct netdev *);
>
> /* Hardware address. */
> --
> 2.0.0
>
Looks good.
Acked-by: Pravin B Shelar <[email protected]>
> _______________________________________________
> dev mailing list
> [email protected]
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev