Hi Jerin

On 11/3/2017 1:23 AM, Jerin Jacob Wrote:
-----Original Message-----
Date: Thu,  2 Nov 2017 08:43:30 +0000
From: Jia He <hejia...@gmail.com>
To: jerin.ja...@caviumnetworks.com, dev@dpdk.org, olivier.m...@6wind.com
Cc: konstantin.anan...@intel.com, bruce.richard...@intel.com,
  jianbo....@arm.com, hemant.agra...@nxp.com, Jia He <hejia...@gmail.com>,
  jie2....@hxt-semitech.com, bing.z...@hxt-semitech.com,
  jia...@hxt-semitech.com
Subject: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
X-Mailer: git-send-email 2.7.4

We watched a rte panic of mbuf_autotest in our qualcomm arm64 server.
As for the possible race condition, please refer to [1].
Hi Jia,

In addition to Konstantin comments. Please find below some review
comments.
Furthermore, there are 2 options as suggested by Jerin:
1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).
CONFIG_RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER is provided, and by
I think, The better name would be CONFIG_RTE_RING_USE_C11_MEM_MODEL
or something like that.
Ok, but how to distinguish following 2 options?

CONFIG_RTE_RING_USE_C11_MEM_MODEL doesn't seem to be enough

1. use rte_smp_rmb
2. use load_acquire/store_release(refer to [2]).

default it is n;

---
Changelog:
V2: let users choose whether using load_acquire/store_release
V1: rte_smp_rmb() between 2 loads

Signed-off-by: Jia He <hejia...@gmail.com>
Signed-off-by: jie2....@hxt-semitech.com
Signed-off-by: bing.z...@hxt-semitech.com
Signed-off-by: jia...@hxt-semitech.com
Suggested-by: jerin.ja...@caviumnetworks.com
---
  lib/librte_ring/Makefile           |  4 +++-
  lib/librte_ring/rte_ring.h         | 38 ++++++++++++++++++++++++------
  lib/librte_ring/rte_ring_arm64.h   | 48 ++++++++++++++++++++++++++++++++++++++
  lib/librte_ring/rte_ring_generic.h | 45 +++++++++++++++++++++++++++++++++++
  4 files changed, 127 insertions(+), 8 deletions(-)
  create mode 100644 lib/librte_ring/rte_ring_arm64.h
  create mode 100644 lib/librte_ring/rte_ring_generic.h

diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile
index e34d9d9..fa57a86 100644
--- a/lib/librte_ring/Makefile
+++ b/lib/librte_ring/Makefile
@@ -45,6 +45,8 @@ LIBABIVER := 1
  SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
# install includes
-SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+                                       rte_ring_arm64.h \
It is really not specific to arm64. We could rename it to rte_ring_c11_mem.h or
something like that to reflect the implementation based on c11 memory
model.


+                                       rte_ring_generic.h
include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
index 5e9b3b7..943b1f9 100644
--- a/lib/librte_ring/rte_ring.h
+++ b/lib/librte_ring/rte_ring.h
@@ -103,6 +103,18 @@ extern "C" {
  #include <rte_memzone.h>
  #include <rte_pause.h>
+/* In those strong memory models (e.g. x86), there is no need to add rmb()
+ * between load and load.
+ * In those weak models(powerpc/arm), there are 2 choices for the users
+ * 1.use rmb() memory barrier
+ * 2.use one-direcion load_acquire/store_release barrier
+ * It depends on performance test results. */
+#ifdef RTE_ARCH_ARM64
s/RTE_ARCH_ARM64/RTE_RING_USE_C11_MEM_MODEL and update the generic arm64 config.
By that way it can used by another architecture like ppc if they choose to do 
so.


+#include "rte_ring_arm64.h"
+#else
+#include "rte_ring_generic.h"
+#endif
+
  #define RTE_TAILQ_RING_NAME "RTE_RING"
enum rte_ring_queue_behavior {
@@ -368,7 +380,7 @@ update_tail(struct rte_ring_headtail *ht, uint32_t old_val, 
uint32_t new_val,
                while (unlikely(ht->tail != old_val))
                        rte_pause();
- ht->tail = new_val;
+       arch_rte_atomic_store(&ht->tail, new_val, __ATOMIC_RELEASE);
  }
/**
@@ -408,7 +420,8 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
                /* Reset n to the initial burst count */
                n = max;
- *old_head = r->prod.head;
+               *old_head = arch_rte_atomic_load(&r->prod.head,
+                                               __ATOMIC_ACQUIRE);
Same as Konstantin comments, i.e move to this function to c11 memory model
header file

                const uint32_t cons_tail = r->cons.tail;
                /*
                 *  The subtraction is done between two unsigned 32bits value
@@ -430,8 +443,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
                if (is_sp)
                        r->prod.head = *new_head, success = 1;
                else
-                       success = rte_atomic32_cmpset(&r->prod.head,
-                                       *old_head, *new_head);
+                       success = arch_rte_atomic32_cmpset(&r->prod.head,
+                                       old_head, *new_head,
+                                       0, __ATOMIC_ACQUIRE,
+                                       __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
        return n;
  }
@@ -470,7 +485,10 @@ __rte_ring_do_enqueue(struct rte_ring *r, void * const 
*obj_table,
                goto end;
ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
This #define will be removed if the function moves.

        rte_smp_wmb();
+#endif
update_tail(&r->prod, prod_head, prod_next, is_sp);
  end:
@@ -516,7 +534,8 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
                /* Restore n as it may change every loop */
                n = max;
- *old_head = r->cons.head;
+               *old_head = arch_rte_atomic_load(&r->cons.head,
+                                               __ATOMIC_ACQUIRE);
                const uint32_t prod_tail = r->prod.tail;
                /* The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
@@ -535,8 +554,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
                if (is_sc)
                        r->cons.head = *new_head, success = 1;
                else
-                       success = rte_atomic32_cmpset(&r->cons.head, *old_head,
-                                       *new_head);
+                       success = arch_rte_atomic32_cmpset(&r->cons.head,
+                                       old_head, *new_head,
+                                       0, __ATOMIC_ACQUIRE,
+                                       __ATOMIC_RELAXED);
        } while (unlikely(success == 0));
        return n;
  }
@@ -575,7 +596,10 @@ __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
                goto end;
DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
+
+#ifndef RTE_ATOMIC_ACQUIRE_RELEASE_BARRIER_PREFER
This #define will be removed if the function moves.

        rte_smp_rmb();
+#endif

--
Cheers,
Jia

Reply via email to