The current way has a few problems:

- if cache->len < n, we copy our elements into the cache first, then
  into obj_table, that's unnecessary
- if n >= cache_size (or the backfill fails), and we can't fulfil the
  request from the ring alone, we don't try to combine with the cache
- if refill fails, we don't return anything, even if the ring has enough
  for our request

This patch rewrites it severely:
- at the first part of the function we only try the cache if cache->len < n
- otherwise take our elements straight from the ring
- if that fails but we have something in the cache, try to combine them
- the refill happens at the end, and its failure doesn't modify our return
  value

Signed-off-by: Zoltan Kiss <zoltan.kiss at linaro.org>
---
 lib/librte_mempool/rte_mempool.h | 63 +++++++++++++++++++++++++---------------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h
index a8054e1..896946c 100644
--- a/lib/librte_mempool/rte_mempool.h
+++ b/lib/librte_mempool/rte_mempool.h
@@ -948,34 +948,14 @@ __mempool_get_bulk(struct rte_mempool *mp, void 
**obj_table,
        unsigned lcore_id = rte_lcore_id();
        uint32_t cache_size = mp->cache_size;

-       /* cache is not enabled or single consumer */
+       cache = &mp->local_cache[lcore_id];
+       /* cache is not enabled or single consumer or not enough */
        if (unlikely(cache_size == 0 || is_mc == 0 ||
-                    n >= cache_size || lcore_id >= RTE_MAX_LCORE))
+                    cache->len < n || lcore_id >= RTE_MAX_LCORE))
                goto ring_dequeue;

-       cache = &mp->local_cache[lcore_id];
        cache_objs = cache->objs;

-       /* Can this be satisfied from the cache? */
-       if (cache->len < n) {
-               /* No. Backfill the cache first, and then fill from it */
-               uint32_t req = n + (cache_size - cache->len);
-
-               /* How many do we require i.e. number to fill the cache + the 
request */
-               ret = rte_ring_mc_dequeue_bulk(mp->ring, 
&cache->objs[cache->len], req);
-               if (unlikely(ret < 0)) {
-                       /*
-                        * In the offchance that we are buffer constrained,
-                        * where we are not able to allocate cache + n, go to
-                        * the ring directly. If that fails, we are truly out of
-                        * buffers.
-                        */
-                       goto ring_dequeue;
-               }
-
-               cache->len += req;
-       }
-
        /* Now fill in the response ... */
        for (index = 0, len = cache->len - 1; index < n; ++index, len--, 
obj_table++)
                *obj_table = cache_objs[len];
@@ -984,7 +964,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,

        __MEMPOOL_STAT_ADD(mp, get_success, n);

-       return 0;
+       ret = 0;
+       goto cache_refill;

 ring_dequeue:
 #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
@@ -995,11 +976,45 @@ ring_dequeue:
        else
                ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);

+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+       if (ret < 0 && is_mc == 1 && cache->len > 0) {
+               uint32_t req = n - cache->len;
+
+               ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, req);
+               if (ret == 0) {
+                       cache_objs = cache->objs;
+                       obj_table += req;
+                       for (index = 0; index < cache->len;
+                            ++index, ++obj_table)
+                               *obj_table = cache_objs[index];
+                       cache->len = 0;
+               }
+       }
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
        if (ret < 0)
                __MEMPOOL_STAT_ADD(mp, get_fail, n);
        else
                __MEMPOOL_STAT_ADD(mp, get_success, n);

+#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0
+cache_refill:
+       /* If previous dequeue was OK and we have less than n, start refill */
+       if (ret == 0 && cache_size > 0 && cache->len < n) {
+               uint32_t req = cache_size - cache->len;
+
+               cache_objs = cache->objs;
+               ret = rte_ring_mc_dequeue_bulk(mp->ring,
+                                              &cache->objs[cache->len],
+                                              req);
+               if (likely(ret == 0))
+                       cache->len += req;
+               else
+                       /* Don't spoil the return value */
+                       ret = 0;
+       }
+#endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */
+
        return ret;
 }

-- 
1.9.1

Reply via email to