Hi Honnappa, > > Hi Konstantin, > Few nits/comments inline. > > <snip> > > > diff --git a/lib/librte_ring/rte_ring_hts.h > > b/lib/librte_ring/rte_ring_hts.h new > > file mode 100644 index 000000000..062d7be6c > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_hts.h > > @@ -0,0 +1,210 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2017 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_HTS_H_ > > +#define _RTE_RING_HTS_H_ > > + > > +/** > > + * @file rte_ring_hts.h > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * It is not recommended to include this file directly. > > + * Please include <rte_ring.h> instead. > > + * > > + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. > > + * In that mode enqueue/dequeue operation is fully serialized: > > + * at any given moement only one enqueue/dequeue operation can proceed. > ^^^^^^^^ moment > > + * This is achieved by thread is allowed to proceed with changing > ^^^^^^^^^^^^^^ allowing a thread > > +head.value > > + * only when head.value == tail.value. > > + * Both head and tail values are updated atomically (as one 64-bit value). > > + * To achieve that 64-bit CAS is used by head update routine. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <rte_ring_hts_generic.h> > > + > > <snip> > > > diff --git a/lib/librte_ring/rte_ring_hts_generic.h > > b/lib/librte_ring/rte_ring_hts_generic.h > > new file mode 100644 > > index 000000000..da08f1d94 > > --- /dev/null > > +++ b/lib/librte_ring/rte_ring_hts_generic.h > > @@ -0,0 +1,198 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2010-2020 Intel Corporation > > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > > + * All rights reserved. > > + * Derived from FreeBSD's bufring.h > > + * Used as BSD-3 Licensed with permission from Kip Macy. > > + */ > > + > > +#ifndef _RTE_RING_HTS_GENERIC_H_ > > +#define _RTE_RING_HTS_GENERIC_H_ > > + > > +/** > > + * @file rte_ring_hts_generic.h > > + * It is not recommended to include this file directly, > > + * include <rte_ring.h> instead. > > + * Contains internal helper functions for head/tail sync (HTS) ring mode. > > + * For more information please refer to <rte_ring_hts.h>. > > + */ > > + > > +static __rte_always_inline void > > +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t num, > > + uint32_t enqueue) > > +{ > > + union rte_ring_ht_pos p; > > + > > + if (enqueue) > > + rte_smp_wmb(); > > + else > > + rte_smp_rmb(); > > + > > + p.raw = rte_atomic64_read((rte_atomic64_t *)(uintptr_t)&ht- > > >ht.raw); > This read can be avoided if the new head can be returned from > '__rte_ring_hts_head_wait'.
Yes, or even cur_head and num should be enough to avoid read here. > > > + > > + p.pos.head = p.pos.tail + num; > > + p.pos.tail = p.pos.head; > > + > > + rte_atomic64_set((rte_atomic64_t *)(uintptr_t)&ht->ht.raw, p.raw); } > Why not use 32b atomic operation here and update just the tail? Agree, this code-path can do just 32-bit store (as head is not going to change). > > > + > > +/** > > + * @internal waits till tail will become equal to head. > > + * Means no writer/reader is active for that ring. > > + * Suppose to work as serialization point. > > + */ > > +static __rte_always_inline void > > +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, > > + union rte_ring_ht_pos *p) > > +{ > > + p->raw = rte_atomic64_read((rte_atomic64_t *) > > + (uintptr_t)&ht->ht.raw); > > + > > + while (p->pos.head != p->pos.tail) { > > + rte_pause(); > > + p->raw = rte_atomic64_read((rte_atomic64_t *) > > + (uintptr_t)&ht->ht.raw); > > + } > > +} > > + > > +/** > > + * @internal This function updates the producer head for enqueue > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sp > > + * Indicates whether multi-producer path is needed or not > > + * @param n > > + * The number of elements we will want to enqueue, i.e. how far should > > the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where enqueue > > starts > > + * @param new_head > > + * Returns the current/new head value i.e. where enqueue finishes Ups, copy/paste thing - will remove. > Would be good to return the new_head from this function and use it in > '__rte_ring_hts_update_tail'. I think old_head + num should be enough here (see above). > > > + * @param free_entries > > + * Returns the amount of free space in the ring BEFORE head was moved > > + * @return > > + * Actual number of objects enqueued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > Minor, suggest removing the elaborate comments, it is not required and > difficult to maintain. > I think we should do the same thing for other files too. Sorry, didn't get you here: what exactly do you suggest to remove? > > > +static __rte_always_inline unsigned int > > +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, > > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > > + uint32_t *free_entries) > > +{ > > + uint32_t n; > > + union rte_ring_ht_pos np, op; > > + > > + const uint32_t capacity = r->capacity; > > + > > + do { > > + /* Reset n to the initial burst count */ > > + n = num; > > + > > + /* wait for tail to be equal to head */ > > + __rte_ring_hts_head_wait(&r->hts_prod, &op); > > + > > + /* add rmb barrier to avoid load/load reorder in weak > > + * memory model. It is noop on x86 > > + */ > > + rte_smp_rmb(); > > + > > + /* > > + * The subtraction is done between two unsigned 32bits > > value > > + * (the result is always modulo 32 bits even if we have > > + * *old_head > cons_tail). So 'free_entries' is always between > > 0 > > + * and capacity (which is < size). > > + */ > > + *free_entries = capacity + r->cons.tail - op.pos.head; > > + > > + /* check that we have enough room in ring */ > > + if (unlikely(n > *free_entries)) > > + n = (behavior == RTE_RING_QUEUE_FIXED) ? > > + 0 : *free_entries; > > + > > + if (n == 0) > > + break; > > + > > + np.pos.tail = op.pos.tail; > > + np.pos.head = op.pos.head + n; > > + > > + } while (rte_atomic64_cmpset(&r->hts_prod.ht.raw, > > + op.raw, np.raw) == 0); > I think we can use 32b atomic operation here and just update the head. I think we have to do proper 64 bit CAS here, otherwise ABA race could arise: Thread reads head/tail values, then get suspended just before CAS instruction for a while. Thread resumes when ring head value is equal to thread's local head value, but tail differs (some other thread enqueuing into the ring). If we'll do CAS just for head - it would succeed, though it shouldn't. I understand that with 32-bit head/tail values probability of such situation is really low, but still. > > > + > > + *old_head = op.pos.head; > > + return n; > > +} > > + > > +/** > > + * @internal This function updates the consumer head for dequeue > > + * > > + * @param r > > + * A pointer to the ring structure > > + * @param is_sc > > + * Indicates whether multi-consumer path is needed or not > > + * @param n > > + * The number of elements we will want to enqueue, i.e. how far should > > the > > + * head be moved > > + * @param behavior > > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a > > ring > > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > > ring > > + * @param old_head > > + * Returns head value as it was before the move, i.e. where dequeue > > starts > > + * @param new_head > > + * Returns the current/new head value i.e. where dequeue finishes > > + * @param entries > > + * Returns the number of entries in the ring BEFORE head was moved > > + * @return > > + * - Actual number of objects dequeued. > > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > > + */ > > +static __rte_always_inline unsigned int > > +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, > > + enum rte_ring_queue_behavior behavior, uint32_t *old_head, > > + uint32_t *entries) > > +{ > > + uint32_t n; > > + union rte_ring_ht_pos np, op; > > + > > + /* move cons.head atomically */ > > + do { > > + /* Restore n as it may change every loop */ > > + n = num; > > + > > + /* wait for tail to be equal to head */ > > + __rte_ring_hts_head_wait(&r->hts_cons, &op); > > + > > + /* add rmb barrier to avoid load/load reorder in weak > > + * memory model. It is noop on x86 > > + */ > > + rte_smp_rmb(); > > + > > + /* The subtraction is done between two unsigned 32bits value > > + * (the result is always modulo 32 bits even if we have > > + * cons_head > prod_tail). So 'entries' is always between 0 > > + * and size(ring)-1. > > + */ > > + *entries = r->prod.tail - op.pos.head; > > + > > + /* Set the actual entries for dequeue */ > > + if (n > *entries) > > + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : > > *entries; > > + > > + if (unlikely(n == 0)) > > + break; > > + > > + np.pos.tail = op.pos.tail; > > + np.pos.head = op.pos.head + n; > > + > > + } while (rte_atomic64_cmpset(&r->hts_cons.ht.raw, > > + op.raw, np.raw) == 0); > > + > > + *old_head = op.pos.head; > > + return n; > > +} > > + > > +#endif /* _RTE_RING_HTS_GENERIC_H_ */ > > -- > > 2.17.1