> -----Original Message-----
> From: Gavin Hu (Arm Technology China)
> Sent: Thursday, March 7, 2019 6:45 PM
> To: Ilya Maximets <i.maxim...@samsung.com>; dev@dpdk.org
> Cc: nd <n...@arm.com>; tho...@monjalon.net; jer...@marvell.com;
> hemant.agra...@nxp.com; nipun.gu...@nxp.com; Honnappa Nagarahalli
> <honnappa.nagaraha...@arm.com>; olivier.m...@6wind.com;
> bruce.richard...@intel.com; konstantin.anan...@intel.com;
> chao...@linux.vnet.ibm.com
> Subject: RE: [PATCH v2] ring: enforce reading the tails before ring operations
> 
> 
> 
> > -----Original Message-----
> > From: Ilya Maximets <i.maxim...@samsung.com>
> > Sent: Thursday, March 7, 2019 5:48 PM
> > To: Gavin Hu (Arm Technology China) <gavin...@arm.com>;
> dev@dpdk.org
> > Cc: nd <n...@arm.com>; tho...@monjalon.net; jer...@marvell.com;
> > hemant.agra...@nxp.com; nipun.gu...@nxp.com; Honnappa Nagarahalli
> > <honnappa.nagaraha...@arm.com>; olivier.m...@6wind.com
> > Subject: Re: [PATCH v2] ring: enforce reading the tails before ring
> > operations
> >
> > On 07.03.2019 12:27, Gavin Hu (Arm Technology China) wrote:
> > >
> > >
> > >> -----Original Message-----
> > >> From: Ilya Maximets <i.maxim...@samsung.com>
> > >> Sent: Thursday, March 7, 2019 4:52 PM
> > >> To: Gavin Hu (Arm Technology China) <gavin...@arm.com>;
> > >> dev@dpdk.org
> > >> Cc: nd <n...@arm.com>; tho...@monjalon.net; jer...@marvell.com;
> > >> hemant.agra...@nxp.com; nipun.gu...@nxp.com; Honnappa
> Nagarahalli
> > >> <honnappa.nagaraha...@arm.com>; olivier.m...@6wind.com
> > >> Subject: Re: [PATCH v2] ring: enforce reading the tails before ring
> > >> operations
> > >>
> > >> On 07.03.2019 9:45, gavin hu wrote:
> > >>> In weak memory models, like arm64, reading the {prod,cons}.tail may
> get
> > >>> reordered after reading or writing the ring slots, which corrupts the
> ring
> > >>> and stale data is observed.
> > >>>
> > >>> This issue was reported by NXP on 8-A72 DPAA2 board. The problem
> is
> > >> most
> > >>> likely caused by missing the acquire semantics when reading cons.tail
> (in
> > >>> SP enqueue) or prod.tail (in SC dequeue) which makes it possible to
> > read
> > >> a
> > >>> stale value from the ring slots.
> > >>>
> > >>> For MP (and MC) case, rte_atomic32_cmpset() already provides the
> > >> required
> > >>> ordering. This patch is to prevent reading and writing the ring slots 
> > >>> get
> > >>> reordered before reading {prod,cons}.tail for SP (and SC) case.
> > >>
> > >> Read barrier rte_smp_rmb() is OK to prevent reading the ring get
> > >> reordered
> > >> before reading the tail. However, to prevent *writing* the ring get
> > >> reordered
> > >> *before reading* the tail you need a full memory barrier, i.e.
> > >> rte_smp_mb().
> > >
> > > ISHLD(rte_smp_rmb is DMB(ishld) orders LD/LD and LD/ST, while
> WMB(ST
> > Option) orders ST/ST.
> > > For more details, please refer to: Table B2-1 Encoding of the DMB and
> DSB
> > <option> parameter  in
> > > https://developer.arm.com/docs/ddi0487/latest/arm-architecture-
> > reference-manual-armv8-for-armv8-a-architecture-profile
> >
> > I see. But you have to change the rte_smp_rmb() function definition in
> > lib/librte_eal/common/include/generic/rte_atomic.h and assure that all
> > other architectures follows same rules.
> > Otherwise, this change is logically wrong, because read barrier in current
> > definition could not be used to order Load with Store.
> >
> 
> Good points, let me re-think how to handle for other architectures.
> Full MB is required for other architectures(x86? Ppc?), but for arm, read
> barrier(load/store and load/load) is enough.

Hi Ilya,

I would expand the rmb definition to cover load/store, in addition to load/load.
For X86, as a strong memory order model, rmb is actually equivalent to mb, as 
implemented as a compiler barrier: rte_compiler_barrier, arm32 is also this 
case.
For PPC, both 32 and 64-bit, rmb=wmb=mb, lwsync/sync orders load/store, 
load/load, store/load, store/store, looking at the table on this page:
https://www.ibm.com/developerworks/systems/articles/powerpc.html 

In summary, we are safe to expand this definition for all the architectures 
DPDK support? 
Any comments are welcome!

BR. Gavin

> > >
> > >>
> > >>>
> > >>> Signed-off-by: gavin hu <gavin...@arm.com>
> > >>> Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com>
> > >>> Tested-by: Nipun Gupta <nipun.gu...@nxp.com>
> > >>> ---
> > >>>  lib/librte_ring/rte_ring_generic.h | 16 ++++++++++------
> > >>>  1 file changed, 10 insertions(+), 6 deletions(-)
> > >>>
> > >>> diff --git a/lib/librte_ring/rte_ring_generic.h
> > >> b/lib/librte_ring/rte_ring_generic.h
> > >>> index ea7dbe5..1bd3dfd 100644
> > >>> --- a/lib/librte_ring/rte_ring_generic.h
> > >>> +++ b/lib/librte_ring/rte_ring_generic.h
> > >>> @@ -90,9 +90,11 @@ __rte_ring_move_prod_head(struct rte_ring
> *r,
> > >> unsigned int is_sp,
> > >>>                         return 0;
> > >>>
> > >>>                 *new_head = *old_head + n;
> > >>> -               if (is_sp)
> > >>> -                       r->prod.head = *new_head, success = 1;
> > >>> -               else
> > >>> +               if (is_sp) {
> > >>> +                       r->prod.head = *new_head;
> > >>> +                       rte_smp_rmb();
> > >>> +                       success = 1;
> > >>> +               } else
> > >>>                         success = rte_atomic32_cmpset(&r->prod.head,
> > >>>                                         *old_head, *new_head);
> > >>>         } while (unlikely(success == 0));
> > >>> @@ -158,9 +160,11 @@ __rte_ring_move_cons_head(struct
> rte_ring
> > *r,
> > >> unsigned int is_sc,
> > >>>                         return 0;
> > >>>
> > >>>                 *new_head = *old_head + n;
> > >>> -               if (is_sc)
> > >>> -                       r->cons.head = *new_head, success = 1;
> > >>> -               else
> > >>> +               if (is_sc) {
> > >>> +                       r->cons.head = *new_head;
> > >>> +                       rte_smp_rmb();
> > >>> +                       success = 1;
> > >>> +               } else
> > >>>                         success = rte_atomic32_cmpset(&r->cons.head,
> > >> *old_head,
> > >>>                                         *new_head);
> > >>>         } while (unlikely(success == 0));
> > >>>

Reply via email to