Move the statistics from the service data structure to the per-lcore
struct. This eliminates contention for the counter cache lines, which
decreases the producer-side statistics overhead for services deployed
across many lcores.

Prior to this patch, enabling statistics for a service with a
per-service function call latency of 1000 clock cycles deployed across
16 cores on a Intel Xeon 6230N @ 2,3 GHz would incur a cost of ~10000
core clock cycles per service call. After this patch, the statistics
overhead is reduce to 22 clock cycles per call.

Signed-off-by: Mattias Rönnblom <mattias.ronnb...@ericsson.com>
---
 lib/eal/common/rte_service.c | 182 +++++++++++++++++++++++------------
 1 file changed, 121 insertions(+), 61 deletions(-)

diff --git a/lib/eal/common/rte_service.c b/lib/eal/common/rte_service.c
index 94cb056196..b5103f2a20 100644
--- a/lib/eal/common/rte_service.c
+++ b/lib/eal/common/rte_service.c
@@ -50,17 +50,8 @@ struct rte_service_spec_impl {
         * on currently.
         */
        uint32_t num_mapped_cores;
-
-       /* 32-bit builds won't naturally align a uint64_t, so force alignment,
-        * allowing regular reads to be atomic.
-        */
-       uint64_t calls __rte_aligned(8);
-       uint64_t cycles_spent __rte_aligned(8);
 } __rte_cache_aligned;
 
-/* Mask used to ensure uint64_t 8 byte vars are naturally aligned. */
-#define RTE_SERVICE_STAT_ALIGN_MASK (8 - 1)
-
 /* the internal values of a service core */
 struct core_state {
        /* map of services IDs are run on this core */
@@ -71,6 +62,7 @@ struct core_state {
        uint8_t service_active_on_lcore[RTE_SERVICE_NUM_MAX];
        uint64_t loops;
        uint64_t calls_per_service[RTE_SERVICE_NUM_MAX];
+       uint64_t cycles_per_service[RTE_SERVICE_NUM_MAX];
 } __rte_cache_aligned;
 
 static uint32_t rte_service_count;
@@ -138,13 +130,16 @@ rte_service_finalize(void)
        rte_service_library_initialized = 0;
 }
 
-/* returns 1 if service is registered and has not been unregistered
- * Returns 0 if service never registered, or has been unregistered
- */
-static inline int
+static inline bool
+service_registered(uint32_t id)
+{
+       return rte_services[id].internal_flags & SERVICE_F_REGISTERED;
+}
+
+static inline bool
 service_valid(uint32_t id)
 {
-       return !!(rte_services[id].internal_flags & SERVICE_F_REGISTERED);
+       return id < RTE_SERVICE_NUM_MAX && service_registered(id);
 }
 
 static struct rte_service_spec_impl *
@@ -155,7 +150,7 @@ service_get(uint32_t id)
 
 /* validate ID and retrieve service pointer, or return error value */
 #define SERVICE_VALID_GET_OR_ERR_RET(id, service, retval) do {          \
-       if (id >= RTE_SERVICE_NUM_MAX || !service_valid(id))            \
+       if (!service_valid(id))                                         \
                return retval;                                          \
        service = &rte_services[id];                                    \
 } while (0)
@@ -217,7 +212,7 @@ rte_service_get_by_name(const char *name, uint32_t 
*service_id)
 
        int i;
        for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
-               if (service_valid(i) &&
+               if (service_registered(i) &&
                                strcmp(name, rte_services[i].spec.name) == 0) {
                        *service_id = i;
                        return 0;
@@ -254,7 +249,7 @@ rte_service_component_register(const struct 
rte_service_spec *spec,
                return -EINVAL;
 
        for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
-               if (!service_valid(i)) {
+               if (!service_registered(i)) {
                        free_slot = i;
                        break;
                }
@@ -366,29 +361,24 @@ service_runner_do_callback(struct rte_service_spec_impl 
*s,
 {
        void *userdata = s->spec.callback_userdata;
 
-       /* Ensure the atomically stored variables are naturally aligned,
-        * as required for regular loads to be atomic.
-        */
-       RTE_BUILD_BUG_ON((offsetof(struct rte_service_spec_impl, calls)
-               & RTE_SERVICE_STAT_ALIGN_MASK) != 0);
-       RTE_BUILD_BUG_ON((offsetof(struct rte_service_spec_impl, cycles_spent)
-               & RTE_SERVICE_STAT_ALIGN_MASK) != 0);
-
        if (service_stats_enabled(s)) {
                uint64_t start = rte_rdtsc();
                s->spec.callback(userdata);
                uint64_t end = rte_rdtsc();
                uint64_t cycles = end - start;
-               cs->calls_per_service[service_idx]++;
-               if (service_mt_safe(s)) {
-                       __atomic_fetch_add(&s->cycles_spent, cycles, 
__ATOMIC_RELAXED);
-                       __atomic_fetch_add(&s->calls, 1, __ATOMIC_RELAXED);
-               } else {
-                       uint64_t cycles_new = s->cycles_spent + cycles;
-                       uint64_t calls_new = s->calls++;
-                       __atomic_store_n(&s->cycles_spent, cycles_new, 
__ATOMIC_RELAXED);
-                       __atomic_store_n(&s->calls, calls_new, 
__ATOMIC_RELAXED);
-               }
+
+               /* The lcore service worker thread is the only writer,
+                * and thus only a non-atomic load and an atomic store
+                * is needed, and not the more expensive atomic
+                * add.
+                */
+               __atomic_store_n(&cs->calls_per_service[service_idx],
+                                cs->calls_per_service[service_idx] + 1,
+                                __ATOMIC_RELAXED);
+
+               __atomic_store_n(&cs->cycles_per_service[service_idx],
+                                cs->cycles_per_service[service_idx] + cycles,
+                                __ATOMIC_RELAXED);
        } else
                s->spec.callback(userdata);
 }
@@ -436,7 +426,7 @@ rte_service_may_be_active(uint32_t id)
        int32_t lcore_count = rte_service_lcore_list(ids, RTE_MAX_LCORE);
        int i;
 
-       if (id >= RTE_SERVICE_NUM_MAX || !service_valid(id))
+       if (!service_valid(id))
                return -EINVAL;
 
        for (i = 0; i < lcore_count; i++) {
@@ -483,16 +473,17 @@ service_runner_func(void *arg)
         */
        while (__atomic_load_n(&cs->runstate, __ATOMIC_ACQUIRE) ==
                        RUNSTATE_RUNNING) {
+
                const uint64_t service_mask = cs->service_mask;
 
                for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
-                       if (!service_valid(i))
+                       if (!service_registered(i))
                                continue;
                        /* return value ignored as no change to code flow */
                        service_run(i, cs, service_mask, service_get(i), 1);
                }
 
-               cs->loops++;
+               __atomic_store_n(&cs->loops, cs->loops + 1, __ATOMIC_RELAXED);
        }
 
        /* Use SEQ CST memory ordering to avoid any re-ordering around
@@ -608,8 +599,8 @@ static int32_t
 service_update(uint32_t sid, uint32_t lcore, uint32_t *set, uint32_t *enabled)
 {
        /* validate ID, or return error value */
-       if (sid >= RTE_SERVICE_NUM_MAX || !service_valid(sid) ||
-           lcore >= RTE_MAX_LCORE || !lcore_states[lcore].is_service_core)
+       if (!service_valid(sid) || lcore >= RTE_MAX_LCORE ||
+           !lcore_states[lcore].is_service_core)
                return -EINVAL;
 
        uint64_t sid_mask = UINT64_C(1) << sid;
@@ -813,21 +804,75 @@ rte_service_lcore_stop(uint32_t lcore)
        return 0;
 }
 
+static uint64_t
+lcore_attr_get_loops(unsigned int lcore)
+{
+       struct core_state *cs = &lcore_states[lcore];
+
+       return __atomic_load_n(&cs->loops, __ATOMIC_RELAXED);
+}
+
+static uint64_t
+lcore_attr_get_service_calls(uint32_t service_id, unsigned int lcore)
+{
+       struct core_state *cs = &lcore_states[lcore];
+
+       return __atomic_load_n(&cs->calls_per_service[service_id],
+                              __ATOMIC_RELAXED);
+}
+
+static uint64_t
+lcore_attr_get_service_cycles(uint32_t service_id, unsigned int lcore)
+{
+       struct core_state *cs = &lcore_states[lcore];
+
+       return __atomic_load_n(&cs->cycles_per_service[service_id],
+                              __ATOMIC_RELAXED);
+}
+
+typedef uint64_t (*lcore_attr_get_fun)(uint32_t service_id,
+                                      unsigned int lcore);
+
+static uint64_t
+attr_get(uint32_t id, lcore_attr_get_fun lcore_attr_get)
+{
+       unsigned int lcore;
+       uint64_t sum = 0;
+
+       for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++)
+               if (lcore_states[lcore].is_service_core)
+                       sum += lcore_attr_get(id, lcore);
+
+       return sum;
+}
+
+static uint64_t
+attr_get_service_calls(uint32_t service_id)
+{
+       return attr_get(service_id, lcore_attr_get_service_calls);
+}
+
+static uint64_t
+attr_get_service_cycles(uint32_t service_id)
+{
+       return attr_get(service_id, lcore_attr_get_service_cycles);
+}
+
 int32_t
 rte_service_attr_get(uint32_t id, uint32_t attr_id, uint64_t *attr_value)
 {
-       struct rte_service_spec_impl *s;
-       SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
+       if (!service_valid(id))
+               return -EINVAL;
 
        if (!attr_value)
                return -EINVAL;
 
        switch (attr_id) {
-       case RTE_SERVICE_ATTR_CYCLES:
-               *attr_value = s->cycles_spent;
-               return 0;
        case RTE_SERVICE_ATTR_CALL_COUNT:
-               *attr_value = s->calls;
+               *attr_value = attr_get_service_calls(id);
+               return 0;
+       case RTE_SERVICE_ATTR_CYCLES:
+               *attr_value = attr_get_service_cycles(id);
                return 0;
        default:
                return -EINVAL;
@@ -849,7 +894,7 @@ rte_service_lcore_attr_get(uint32_t lcore, uint32_t attr_id,
 
        switch (attr_id) {
        case RTE_SERVICE_LCORE_ATTR_LOOPS:
-               *attr_value = cs->loops;
+               *attr_value = lcore_attr_get_loops(lcore);
                return 0;
        default:
                return -EINVAL;
@@ -859,11 +904,18 @@ rte_service_lcore_attr_get(uint32_t lcore, uint32_t 
attr_id,
 int32_t
 rte_service_attr_reset_all(uint32_t id)
 {
-       struct rte_service_spec_impl *s;
-       SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
+       unsigned int lcore;
+
+       if (!service_valid(id))
+               return -EINVAL;
+
+       for (lcore = 0; lcore < RTE_MAX_LCORE; lcore++) {
+               struct core_state *cs = &lcore_states[lcore];
+
+               cs->calls_per_service[id] = 0;
+               cs->cycles_per_service[id] = 0;
+       }
 
-       s->cycles_spent = 0;
-       s->calls = 0;
        return 0;
 }
 
@@ -885,17 +937,25 @@ rte_service_lcore_attr_reset_all(uint32_t lcore)
 }
 
 static void
-service_dump_one(FILE *f, struct rte_service_spec_impl *s)
+service_dump_one(FILE *f, uint32_t id)
 {
+       struct rte_service_spec_impl *s;
+       uint64_t service_calls;
+       uint64_t service_cycles;
+
+       service_calls = attr_get_service_calls(id);
+       service_cycles = attr_get_service_cycles(id);
+
        /* avoid divide by zero */
-       int calls = 1;
+       if (service_calls == 0)
+               service_calls = 1;
+
+       s = service_get(id);
 
-       if (s->calls != 0)
-               calls = s->calls;
        fprintf(f, "  %s: stats %d\tcalls %"PRIu64"\tcycles %"
                        PRIu64"\tavg: %"PRIu64"\n",
-                       s->spec.name, service_stats_enabled(s), s->calls,
-                       s->cycles_spent, s->cycles_spent / calls);
+                       s->spec.name, service_stats_enabled(s), service_calls,
+                       service_cycles, service_cycles / service_calls);
 }
 
 static void
@@ -906,7 +966,7 @@ service_dump_calls_per_lcore(FILE *f, uint32_t lcore)
 
        fprintf(f, "%02d\t", lcore);
        for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
-               if (!service_valid(i))
+               if (!service_registered(i))
                        continue;
                fprintf(f, "%"PRIu64"\t", cs->calls_per_service[i]);
        }
@@ -924,16 +984,16 @@ rte_service_dump(FILE *f, uint32_t id)
                struct rte_service_spec_impl *s;
                SERVICE_VALID_GET_OR_ERR_RET(id, s, -EINVAL);
                fprintf(f, "Service %s Summary\n", s->spec.name);
-               service_dump_one(f, s);
+               service_dump_one(f, id);
                return 0;
        }
 
        /* print all services, as UINT32_MAX was passed as id */
        fprintf(f, "Services Summary\n");
        for (i = 0; i < RTE_SERVICE_NUM_MAX; i++) {
-               if (!service_valid(i))
+               if (!service_registered(i))
                        continue;
-               service_dump_one(f, &rte_services[i]);
+               service_dump_one(f, i);
        }
 
        fprintf(f, "Service Cores Summary\n");
-- 
2.34.1

Reply via email to