The move head function has optimization for updating when being used on single threaded ring. Code is cleaner if the two cases are split into separate functions.
Signed-off-by: Stephen Hemminger <[email protected]> --- lib/ring/rte_ring_c11_pvt.h | 100 +++++++++++++++++++++++++------- lib/ring/rte_ring_elem_pvt.h | 16 +++-- lib/ring/rte_ring_generic_pvt.h | 77 ++++++++++++++++++++---- lib/ring/soring.c | 24 +++++--- 4 files changed, 171 insertions(+), 46 deletions(-) diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h index 07b6efc416..5afc14dec9 100644 --- a/lib/ring/rte_ring_c11_pvt.h +++ b/lib/ring/rte_ring_c11_pvt.h @@ -46,6 +46,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, /** * @internal This is a helper function that moves the producer/consumer head + * optimized for single threaded case * * @param d * A pointer to the headtail structure with head value to be moved @@ -54,8 +55,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * function only reads tail value from it * @param capacity * Either ring capacity value (for producer), or zero (for consumer) - * @param is_st - * Indicates whether multi-thread safe path is needed or not * @param n * The number of elements we want to move head value on * @param behavior @@ -72,14 +71,77 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int is_st, unsigned int n, + unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { uint32_t stail; - int success; + + /* Single producer: only this thread writes d->head, + * so a relaxed load is sufficient. + */ + *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed); + + /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail, + * ensuring the consumer's ring-element reads are complete before + * we observe the updated tail. + */ + stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); + + /* Unsigned subtraction is modulo 2^32, so entries is always in + * [0, capacity) even if old_head > stail. + */ + *entries = capacity + stail - *old_head; + + /* check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (n > 0) { + *new_head = *old_head + n; + rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed); + } + + return n; +} + +/** + * @internal This is a helper function that moves the producer/consumer head + * for use in multi-thread safe path + * + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * @param n + * The number of elements we want to move head value on + * @param behavior + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible + * @param old_head + * Returns head value as it was before the move + * @param new_head + * Returns the new head value + * @param entries + * Returns the number of ring entries available BEFORE head was moved + * @return + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only + */ +static __rte_always_inline unsigned int +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, + unsigned int n, + enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, uint32_t *entries) +{ + uint32_t stail; + bool success; unsigned int max = n; /* @@ -120,25 +182,21 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, return 0; *new_head = *old_head + n; - if (is_st) { - d->head = *new_head; - success = 1; - } else - /* on failure, *old_head is updated */ - /* - * R1/A2. - * R1: Establishes a synchronizing edge with A0 of a - * different thread. - * A2: Establishes a synchronizing edge with R1 of a - * different thread to observe same value for stail - * observed by that thread on CAS failure (to retry - * with an updated *old_head). - */ - success = rte_atomic_compare_exchange_strong_explicit( + /* on failure, *old_head is updated */ + /* + * R1/A2. + * R1: Establishes a synchronizing edge with A0 of a + * different thread. + * A2: Establishes a synchronizing edge with R1 of a + * different thread to observe same value for stail + * observed by that thread on CAS failure (to retry + * with an updated *old_head). + */ + success = rte_atomic_compare_exchange_strong_explicit( &d->head, old_head, *new_head, rte_memory_order_release, rte_memory_order_acquire); - } while (unlikely(success == 0)); + } while (unlikely(!success)); return n; } diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h index 6eafae121f..a0fdec9812 100644 --- a/lib/ring/rte_ring_elem_pvt.h +++ b/lib/ring/rte_ring_elem_pvt.h @@ -341,8 +341,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, uint32_t *old_head, uint32_t *new_head, uint32_t *free_entries) { - return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity, - is_sp, n, behavior, old_head, new_head, free_entries); + if (is_sp) + return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity, + n, behavior, old_head, new_head, free_entries); + else + return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity, + n, behavior, old_head, new_head, free_entries); } /** @@ -374,8 +378,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { - return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0, - is_sc, n, behavior, old_head, new_head, entries); + if (is_sc) + return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0, + n, behavior, old_head, new_head, entries); + else + return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0, + n, behavior, old_head, new_head, entries); } /** diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h index affd2d5ba7..c044b0824f 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_generic_pvt.h @@ -42,6 +42,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, /** * @internal This is a helper function that moves the producer/consumer head + * for use in multi-thread safe path * * @param d * A pointer to the headtail structure with head value to be moved @@ -50,8 +51,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * function only reads tail value from it * @param capacity * Either ring capacity value (for producer), or zero (for consumer) - * @param is_st - * Indicates whether multi-thread safe path is needed or not * @param n * The number of elements we want to move head value on * @param behavior @@ -68,10 +67,9 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only */ static __rte_always_inline unsigned int -__rte_ring_headtail_move_head(struct rte_ring_headtail *d, +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d, const struct rte_ring_headtail *s, uint32_t capacity, - unsigned int is_st, unsigned int n, - enum rte_ring_queue_behavior behavior, + unsigned int n, enum rte_ring_queue_behavior behavior, uint32_t *old_head, uint32_t *new_head, uint32_t *entries) { unsigned int max = n; @@ -105,15 +103,70 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, return 0; *new_head = *old_head + n; - if (is_st) { - d->head = *new_head; - success = 1; - } else - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); + success = rte_atomic32_cmpset( + (uint32_t *)(uintptr_t)&d->head, + *old_head, *new_head); } while (unlikely(success == 0)); return n; } +/** + * @internal This is a helper function that moves the producer/consumer head + * optimized for single threaded case + * + * @param d + * A pointer to the headtail structure with head value to be moved + * @param s + * A pointer to the counter-part headtail structure. Note that this + * function only reads tail value from it + * @param capacity + * Either ring capacity value (for producer), or zero (for consumer) + * @param n + * The number of elements we want to move head value on + * @param behavior + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible + * @param old_head + * Returns head value as it was before the move + * @param new_head + * Returns the new head value + * @param entries + * Returns the number of ring entries available BEFORE head was moved + * @return + * Actual number of objects the head was moved on + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only + */ +static __rte_always_inline unsigned int +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d, + const struct rte_ring_headtail *s, uint32_t capacity, + unsigned int n, + enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, uint32_t *entries) +{ + *old_head = d->head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > s->tail). So 'entries' is always between 0 + * and capacity (which is < size). + */ + *entries = (capacity + s->tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (likely(n > 0)) { + *new_head = *old_head + n; + d->head = *new_head; + } + return n; +} + #endif /* _RTE_RING_GENERIC_PVT_H_ */ diff --git a/lib/ring/soring.c b/lib/ring/soring.c index e9c75619fe..22f9c60e9c 100644 --- a/lib/ring/soring.c +++ b/lib/ring/soring.c @@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num, switch (st) { case RTE_RING_SYNC_ST: + n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht, + r->capacity, num, behavior, head, next, free); + break; case RTE_RING_SYNC_MT: - n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, - r->capacity, st, num, behavior, head, next, free); + n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht, + r->capacity, num, behavior, head, next, free); break; case RTE_RING_SYNC_MT_RTS: n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht, @@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, switch (st) { case RTE_RING_SYNC_ST: + n = __rte_ring_headtail_move_head_st(&r->cons.ht, + &r->stage[stage].ht, 0, num, behavior, + head, next, avail); + break; case RTE_RING_SYNC_MT: - n = __rte_ring_headtail_move_head(&r->cons.ht, - &r->stage[stage].ht, 0, st, num, behavior, + n = __rte_ring_headtail_move_head_mt(&r->cons.ht, + &r->stage[stage].ht, 0, num, behavior, head, next, avail); break; case RTE_RING_SYNC_MT_RTS: @@ -309,9 +316,8 @@ soring_enqueue_start(struct rte_soring *r, uint32_t num, switch (st) { case RTE_RING_SYNC_ST: - n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, - r->capacity, RTE_RING_SYNC_ST, num, behavior, - &head, &next, &free); + n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht, + r->capacity, num, behavior, &head, &next, &free); break; case RTE_RING_SYNC_MT_HTS: n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht, @@ -419,8 +425,8 @@ soring_dequeue_start(struct rte_soring *r, void *objs, void *meta, switch (st) { case RTE_RING_SYNC_ST: - n = __rte_ring_headtail_move_head(&r->cons.ht, &r->stage[ns].ht, - 0, RTE_RING_SYNC_ST, num, behavior, &head, &next, + n = __rte_ring_headtail_move_head_st(&r->cons.ht, &r->stage[ns].ht, + 0, num, behavior, &head, &next, &avail); break; case RTE_RING_SYNC_MT_HTS: -- 2.53.0

