Add a sched_yield() syscall if the thread spins for too long, waiting other 
thread to finish its operations on the ring.
That gives pre-empted thread a chance to proceed and finish with ring 
enqnue/dequeue operation.
The purpose is to reduce contention on the ring.

Signed-off-by: Cunming Liang <cunming.liang at intel.com>
---
 lib/librte_ring/rte_ring.h | 35 +++++++++++++++++++++++++++++------
 1 file changed, 29 insertions(+), 6 deletions(-)

diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 39bacdd..c402c73 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -126,6 +126,7 @@ struct rte_ring_debug_stats {

 #define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */
 #define RTE_RING_MZ_PREFIX "RG_"
+#define RTE_RING_PAUSE_REP 0x100  /**< yield after num of times pause. */

 /**
  * An RTE ring structure.
@@ -410,7 +411,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const 
*obj_table,
        uint32_t cons_tail, free_entries;
        const unsigned max = n;
        int success;
-       unsigned i;
+       unsigned i, rep;
        uint32_t mask = r->prod.mask;
        int ret;

@@ -468,8 +469,19 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const 
*obj_table,
         * If there are other enqueues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->prod.tail != prod_head))
-               rte_pause();
+       do {
+               /* avoid spin too long waiting for other thread finish */
+               for (rep = RTE_RING_PAUSE_REP;
+                    rep != 0 && r->prod.tail != prod_head; rep--)
+                       rte_pause();
+
+               /*
+                * It gives pre-empted thread a chance to proceed and
+                * finish with ring enqnue operation.
+                */
+               if (rep == 0)
+                       sched_yield();
+       } while (rep == 0);

        r->prod.tail = prod_next;
        return ret;
@@ -589,7 +601,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
        uint32_t cons_next, entries;
        const unsigned max = n;
        int success;
-       unsigned i;
+       unsigned i, rep;
        uint32_t mask = r->prod.mask;

        /* move cons.head atomically */
@@ -634,8 +646,19 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void 
**obj_table,
         * If there are other dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->cons.tail != cons_head))
-               rte_pause();
+       do {
+               /* avoid spin too long waiting for other thread finish */
+               for (rep = RTE_RING_PAUSE_REP;
+                    rep != 0 && r->cons.tail != cons_head; rep--)
+                       rte_pause();
+
+               /*
+                * It gives pre-empted thread a chance to proceed and
+                * finish with ring denqnue operation.
+                */
+               if (rep == 0)
+                       sched_yield();
+       } while (rep == 0);

        __RING_STAT_ADD(r, deq_success, n);
        r->cons.tail = cons_next;
-- 
1.8.1.4

Reply via email to