Convert allow_queuing, while_queuing, started, and dev_attached from
rte_atomic32_t to RTE_ATOMIC(uint32_t) and replace rte_atomic32_*()
with rte_atomic_*_explicit().

The data-path / control-thread handshake on allow_queuing and
while_queuing is a Dekker-style mutual-visibility pattern: each side
stores its own flag and then loads the peer's. Both legs must be
seq_cst to forbid store-load reordering; anything weaker permits both
sides to miss each other. The previous rte_atomic32_set/read compiled
to plain volatile stores/loads and provided no such ordering, so this
also closes a latent ordering hole on weakly-ordered ISAs.

The data-path exit store of while_queuing=0 is release, ordering
preceding slot accesses before the control thread observes the data
path as idle.

The flags started and dev_attached are consulted only inside
update_queuing_status, where the per-queue handshake provides the
real synchronization; their loads and stores are relaxed.

Factor the per-queue allow_queuing store and while_queuing wait into
a small update_queue() helper used by both rx and tx loops.

Signed-off-by: Stephen Hemminger <[email protected]>
---
 drivers/net/vhost/rte_eth_vhost.c | 103 +++++++++++++++++++-----------
 1 file changed, 65 insertions(+), 38 deletions(-)

diff --git a/drivers/net/vhost/rte_eth_vhost.c 
b/drivers/net/vhost/rte_eth_vhost.c
index 05940f2461..3b1eedfe42 100644
--- a/drivers/net/vhost/rte_eth_vhost.c
+++ b/drivers/net/vhost/rte_eth_vhost.c
@@ -73,8 +73,8 @@ struct vhost_stats {
 
 struct vhost_queue {
        int vid;
-       rte_atomic32_t allow_queuing;
-       rte_atomic32_t while_queuing;
+       RTE_ATOMIC(uint32_t) allow_queuing;
+       RTE_ATOMIC(uint32_t) while_queuing;
        struct pmd_internal *internal;
        struct rte_mempool *mb_pool;
        uint16_t port;
@@ -86,14 +86,14 @@ struct vhost_queue {
 };
 
 struct pmd_internal {
-       rte_atomic32_t dev_attached;
+       RTE_ATOMIC(uint32_t) dev_attached;
        char *iface_name;
        uint64_t flags;
        uint64_t disable_flags;
        uint64_t features;
        uint16_t max_queues;
        int vid;
-       rte_atomic32_t started;
+       RTE_ATOMIC(uint32_t) started;
        bool vlan_strip;
        bool rx_sw_csum;
        bool tx_sw_csum;
@@ -406,12 +406,19 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        uint16_t i, nb_rx = 0;
        uint16_t nb_receive = nb_bufs;
 
-       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+       /* Fast-path early exit; racy load is fine here -- if we miss a
+        * transition we get caught by the seq_cst check below.
+        */
+       if (unlikely(rte_atomic_load_explicit(&r->allow_queuing, 
rte_memory_order_relaxed) == 0))
                return 0;
 
-       rte_atomic32_set(&r->while_queuing, 1);
-
-       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+       /* Announce presence, then re-check. The store and the following
+        * load MUST both be seq_cst so they are totally ordered with the
+        * control thread's store-to-allow_queuing / load-of-while_queuing
+        * pair. Anything weaker permits both sides to miss each other.
+        */
+       rte_atomic_store_explicit(&r->while_queuing, 1, 
rte_memory_order_seq_cst);
+       if (unlikely(rte_atomic_load_explicit(&r->allow_queuing, 
rte_memory_order_seq_cst) == 0))
                goto out;
 
        /* Dequeue packets from guest TX queue */
@@ -446,7 +453,7 @@ eth_vhost_rx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        }
 
 out:
-       rte_atomic32_set(&r->while_queuing, 0);
+       rte_atomic_store_explicit(&r->while_queuing, 0, 
rte_memory_order_release);
 
        return nb_rx;
 }
@@ -460,12 +467,19 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        uint64_t nb_bytes = 0;
        uint64_t nb_missed = 0;
 
-       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+       /* Fast-path early exit; racy load is fine here -- if we miss a
+        * transition we get caught by the seq_cst check below.
+        */
+       if (unlikely(rte_atomic_load_explicit(&r->allow_queuing, 
rte_memory_order_relaxed) == 0))
                return 0;
 
-       rte_atomic32_set(&r->while_queuing, 1);
-
-       if (unlikely(rte_atomic32_read(&r->allow_queuing) == 0))
+       /* Announce presence, then re-check. The store and the following
+        * load MUST both be seq_cst so they are totally ordered with the
+        * control thread's store-to-allow_queuing / load-of-while_queuing
+        * pair. Anything weaker permits both sides to miss each other.
+        */
+       rte_atomic_store_explicit(&r->while_queuing, 1, 
rte_memory_order_seq_cst);
+       if (unlikely(rte_atomic_load_explicit(&r->allow_queuing, 
rte_memory_order_seq_cst) == 0))
                goto out;
 
        for (i = 0; i < nb_bufs; i++) {
@@ -515,7 +529,7 @@ eth_vhost_tx(void *q, struct rte_mbuf **bufs, uint16_t 
nb_bufs)
        for (i = 0; likely(i < nb_tx); i++)
                rte_pktmbuf_free(bufs[i]);
 out:
-       rte_atomic32_set(&r->while_queuing, 0);
+       rte_atomic_store_explicit(&r->while_queuing, 0, 
rte_memory_order_release);
 
        return nb_tx;
 }
@@ -744,6 +758,19 @@ eth_vhost_unconfigure_intr(struct rte_eth_dev *eth_dev)
        }
 }
 
+static inline void
+update_queue(struct vhost_queue *vq, uint32_t allow, bool wait_queuing)
+{
+       /* seq_cst: pairs with the data-path's seq_cst store of
+        * while_queuing and seq_cst load of allow_queuing.  See
+        * eth_vhost_rx().
+        */
+       rte_atomic_store_explicit(&vq->allow_queuing, allow, 
rte_memory_order_seq_cst);
+       if (wait_queuing)
+               rte_wait_until_equal_32((volatile uint32_t 
*)(uintptr_t)&vq->while_queuing,
+                                       0, rte_memory_order_seq_cst);
+}
+
 static void
 update_queuing_status(struct rte_eth_dev *dev, bool wait_queuing)
 {
@@ -751,14 +778,18 @@ update_queuing_status(struct rte_eth_dev *dev, bool 
wait_queuing)
        struct vhost_queue *vq;
        struct rte_vhost_vring_state *state;
        unsigned int i;
-       int allow_queuing = 1;
+       bool allow_queuing = true;
 
        if (!dev->data->rx_queues || !dev->data->tx_queues)
                return;
 
-       if (rte_atomic32_read(&internal->started) == 0 ||
-           rte_atomic32_read(&internal->dev_attached) == 0)
-               allow_queuing = 0;
+       /* These are control-plane flags consulted only here;
+        * the real data-path handshake is on vq->allow_queuing below.
+        * Relaxed is sufficient.
+        */
+       if (rte_atomic_load_explicit(&internal->started, 
rte_memory_order_relaxed) == 0 ||
+           rte_atomic_load_explicit(&internal->dev_attached, 
rte_memory_order_relaxed) == 0)
+               allow_queuing = false;
 
        state = vring_states[dev->data->port_id];
 
@@ -767,24 +798,18 @@ update_queuing_status(struct rte_eth_dev *dev, bool 
wait_queuing)
                vq = dev->data->rx_queues[i];
                if (vq == NULL)
                        continue;
-               if (allow_queuing && state->cur[vq->virtqueue_id])
-                       rte_atomic32_set(&vq->allow_queuing, 1);
-               else
-                       rte_atomic32_set(&vq->allow_queuing, 0);
-               while (wait_queuing && rte_atomic32_read(&vq->while_queuing))
-                       rte_pause();
+
+               update_queue(vq, !!(allow_queuing && 
state->cur[vq->virtqueue_id]),
+                            wait_queuing);
        }
 
        for (i = 0; i < dev->data->nb_tx_queues; i++) {
                vq = dev->data->tx_queues[i];
                if (vq == NULL)
                        continue;
-               if (allow_queuing && state->cur[vq->virtqueue_id])
-                       rte_atomic32_set(&vq->allow_queuing, 1);
-               else
-                       rte_atomic32_set(&vq->allow_queuing, 0);
-               while (wait_queuing && rte_atomic32_read(&vq->while_queuing))
-                       rte_pause();
+
+               update_queue(vq, !!(allow_queuing && 
state->cur[vq->virtqueue_id]),
+                            wait_queuing);
        }
 }
 
@@ -848,7 +873,7 @@ new_device(int vid)
        }
 
        internal->vid = vid;
-       if (rte_atomic32_read(&internal->started) == 1) {
+       if (rte_atomic_load_explicit(&internal->started, 
rte_memory_order_relaxed) == 1) {
                queue_setup(eth_dev, internal);
                if (dev_conf->intr_conf.rxq)
                        eth_vhost_configure_intr(eth_dev);
@@ -863,7 +888,7 @@ new_device(int vid)
 
        vhost_dev_csum_configure(eth_dev);
 
-       rte_atomic32_set(&internal->dev_attached, 1);
+       rte_atomic_store_explicit(&internal->dev_attached, 1, 
rte_memory_order_relaxed);
        update_queuing_status(eth_dev, false);
 
        VHOST_LOG_LINE(INFO, "Vhost device %d created", vid);
@@ -893,7 +918,7 @@ destroy_device(int vid)
        eth_dev = list->eth_dev;
        internal = eth_dev->data->dev_private;
 
-       rte_atomic32_set(&internal->dev_attached, 0);
+       rte_atomic_store_explicit(&internal->dev_attached, 0, 
rte_memory_order_relaxed);
        update_queuing_status(eth_dev, true);
        eth_vhost_unconfigure_intr(eth_dev);
 
@@ -1148,11 +1173,11 @@ eth_dev_start(struct rte_eth_dev *eth_dev)
        }
 
        queue_setup(eth_dev, internal);
-       if (rte_atomic32_read(&internal->dev_attached) == 1 &&
+       if (rte_atomic_load_explicit(&internal->dev_attached, 
rte_memory_order_relaxed) == 1 &&
                        dev_conf->intr_conf.rxq)
                eth_vhost_configure_intr(eth_dev);
 
-       rte_atomic32_set(&internal->started, 1);
+       rte_atomic_store_explicit(&internal->started, 1, 
rte_memory_order_relaxed);
        update_queuing_status(eth_dev, false);
 
        for (i = 0; i < eth_dev->data->nb_rx_queues; i++)
@@ -1170,7 +1195,7 @@ eth_dev_stop(struct rte_eth_dev *dev)
        uint16_t i;
 
        dev->data->dev_started = 0;
-       rte_atomic32_set(&internal->started, 0);
+       rte_atomic_store_explicit(&internal->started, 0, 
rte_memory_order_relaxed);
        update_queuing_status(dev, true);
 
        for (i = 0; i < dev->data->nb_rx_queues; i++)
@@ -1471,8 +1496,10 @@ vhost_dev_priv_dump(struct rte_eth_dev *dev, FILE *f)
        fprintf(f, "features: 0x%" PRIx64 "\n", internal->features);
        fprintf(f, "max_queues: %u\n", internal->max_queues);
        fprintf(f, "vid: %d\n", internal->vid);
-       fprintf(f, "started: %d\n", rte_atomic32_read(&internal->started));
-       fprintf(f, "dev_attached: %d\n", 
rte_atomic32_read(&internal->dev_attached));
+       fprintf(f, "started: %u\n",
+               rte_atomic_load_explicit(&internal->started, 
rte_memory_order_relaxed));
+       fprintf(f, "dev_attached: %u\n",
+               rte_atomic_load_explicit(&internal->dev_attached, 
rte_memory_order_relaxed));
        fprintf(f, "vlan_strip: %d\n", internal->vlan_strip);
        fprintf(f, "rx_sw_csum: %d\n", internal->rx_sw_csum);
        fprintf(f, "tx_sw_csum: %d\n", internal->tx_sw_csum);
-- 
2.53.0

Reply via email to