When application creates the timer adapter by passing
`RTE_EVENT_TIMER_ADAPTER_F_SP_PUT` flag, we can optimize the arm sequence
by removing the locking overhead.

Signed-off-by: Pavan Nikhilesh <pbhagavat...@caviumnetworks.com>
---
 drivers/event/octeontx/timvf_evdev.c  |  22 +++-
 drivers/event/octeontx/timvf_evdev.h  |   5 +
 drivers/event/octeontx/timvf_worker.c |  65 ++++++++++++
 drivers/event/octeontx/timvf_worker.h | 183 ++++++++++++++++++++++++++++++++++
 4 files changed, 270 insertions(+), 5 deletions(-)

diff --git a/drivers/event/octeontx/timvf_evdev.c 
b/drivers/event/octeontx/timvf_evdev.c
index d0ba42263..6cf5d4846 100644
--- a/drivers/event/octeontx/timvf_evdev.c
+++ b/drivers/event/octeontx/timvf_evdev.c
@@ -174,6 +174,7 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr)
        struct rte_event_timer_adapter_conf *rcfg = &adptr->data->conf;
        struct timvf_ring *timr;
        struct octeontx_timvf_info tinfo;
+       unsigned int mp_flags = 0;
 
        if (octeontx_timvf_info(&tinfo) < 0)
                return -ENODEV;
@@ -224,6 +225,11 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr)
 
        timr->nb_chunks = nb_timers / nb_chunk_slots;
 
+       if (rcfg->flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT) {
+               mp_flags = MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET;
+               timvf_log_info("Using single producer mode");
+       }
+
        timr->meta.bkt = rte_zmalloc("octeontx_timvf_bucket",
                        (timr->meta.nb_bkts) * sizeof(struct tim_mem_bucket),
                        0);
@@ -261,8 +267,12 @@ timvf_ring_create(struct rte_event_timer_adapter *adptr)
        timvf_write64(0x7, (uint8_t *)timr->vbar0 + TIM_VF_NRSPERR_ENA_W1C);
        timvf_write64(0x7, (uint8_t *)timr->vbar0 + TIM_VF_NRSPERR_ENA_W1S);
 
-       adptr->arm_burst = timvf_timer_reg_burst_mp;
-       adptr->arm_tmo_tick_burst = NULL;
+       if (mp_flags)
+               adptr->arm_burst = timvf_timer_reg_burst_sp;
+       else
+               adptr->arm_burst = timvf_timer_reg_burst_mp;
+
+       adptr->arm_tmo_tick_burst = timvf_timer_reg_brst;
        adptr->cancel_burst = timvf_timer_unreg_burst;
 
        return 0;
@@ -297,11 +307,13 @@ timvf_timer_adapter_caps_get(const struct rte_eventdev 
*dev, uint64_t flags,
                uint32_t *caps, const struct rte_event_timer_adapter_ops **ops)
 {
        RTE_SET_USED(dev);
-       RTE_SET_USED(flags);
 
-       timvf_ops.arm_burst = timvf_timer_reg_burst_mp;
-       timvf_ops.arm_tmo_tick_burst = NULL;
+       if (flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT)
+               timvf_ops.arm_burst = timvf_timer_reg_burst_sp;
+       else
+               timvf_ops.arm_burst = timvf_timer_reg_burst_mp;
 
+       timvf_ops.arm_tmo_tick_burst = timvf_timer_reg_brst;
        timvf_ops.cancel_burst = timvf_timer_unreg_burst;
        *caps = RTE_EVENT_TIMER_ADAPTER_CAP_INTERNAL_PORT;
        *ops = &timvf_ops;
diff --git a/drivers/event/octeontx/timvf_evdev.h 
b/drivers/event/octeontx/timvf_evdev.h
index c80e147e8..b5db233bb 100644
--- a/drivers/event/octeontx/timvf_evdev.h
+++ b/drivers/event/octeontx/timvf_evdev.h
@@ -186,8 +186,13 @@ bkt_mod(uint32_t rel_bkt, uint32_t nb_bkts)
 
 int timvf_timer_adapter_caps_get(const struct rte_eventdev *dev, uint64_t 
flags,
                uint32_t *caps, const struct rte_event_timer_adapter_ops **ops);
+int timvf_timer_reg_brst(const struct rte_event_timer_adapter *adptr,
+               struct rte_event_timer **tim, const uint64_t timeout_tick,
+               const uint16_t nb_timers);
 int timvf_timer_unreg_burst(const struct rte_event_timer_adapter *adptr,
                struct rte_event_timer **tim, const uint16_t nb_timers);
+int timvf_timer_reg_burst_sp(const struct rte_event_timer_adapter *adptr,
+               struct rte_event_timer **tim, const uint16_t nb_timers);
 int timvf_timer_reg_burst_mp(const struct rte_event_timer_adapter *adptr,
                struct rte_event_timer **tim, const uint16_t nb_timers);
 
diff --git a/drivers/event/octeontx/timvf_worker.c 
b/drivers/event/octeontx/timvf_worker.c
index 7a924fd11..3e48f3ca6 100644
--- a/drivers/event/octeontx/timvf_worker.c
+++ b/drivers/event/octeontx/timvf_worker.c
@@ -5,6 +5,42 @@
 
 #include "timvf_worker.h"
 
+int
+timvf_timer_reg_brst(const struct rte_event_timer_adapter *adptr,
+               struct rte_event_timer **tim, const uint64_t timeout_tick,
+               const uint16_t nb_timers)
+{
+       int ret;
+       uint16_t set_timers = 0;
+       uint16_t idx;
+       uint16_t arr_idx = 0;
+       struct timvf_ring *timr = adptr->data->adapter_priv;
+       struct tim_mem_entry entry[TIMVF_MAX_BURST] __rte_cache_aligned;
+
+       if (unlikely(timeout_tick > timr->meta.nb_bkts)) {
+               for (idx = 0; idx < nb_timers; idx++)
+                       tim[idx]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+               rte_errno = -EINVAL;
+               return 0;
+       }
+
+       while (arr_idx < nb_timers) {
+               for (idx = 0; idx < TIMVF_MAX_BURST && (arr_idx < nb_timers);
+                               idx++, arr_idx++) {
+                       entry[idx].w0 =
+                               (tim[arr_idx]->ev.event & 0xFFC000000000) >> 6 |
+                               (tim[arr_idx]->ev.event & 0xFFFFFFFF);
+                       entry[idx].wqe = tim[arr_idx]->ev.u64;
+               }
+               ret = timvf_add_entry_brst(timr, timeout_tick, &tim[set_timers],
+                               entry, idx);
+               set_timers += ret;
+               if (ret != idx)
+                       break;
+       }
+       return set_timers;
+}
+
 int
 timvf_timer_unreg_burst(const struct rte_event_timer_adapter *adptr,
                struct rte_event_timer **tim, const uint16_t nb_timers)
@@ -23,6 +59,35 @@ timvf_timer_unreg_burst(const struct rte_event_timer_adapter 
*adptr,
        return index;
 }
 
+int
+timvf_timer_reg_burst_sp(const struct rte_event_timer_adapter *adptr,
+               struct rte_event_timer **tim, const uint16_t nb_timers)
+{
+       int ret;
+       uint16_t index;
+       struct tim_mem_entry entry;
+       struct timvf_ring *timr = adptr->data->adapter_priv;
+       for (index = 0; index < nb_timers; index++) {
+               if (unlikely(tim[index]->timeout_ticks > timr->meta.nb_bkts)) {
+                       tim[index]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+                       rte_errno = -EINVAL;
+                       break;
+               }
+
+               entry.w0 = (tim[index]->ev.event & 0xFFC000000000) >> 6 |
+                       (tim[index]->ev.event & 0xFFFFFFFF);
+               entry.wqe = tim[index]->ev.u64;
+               ret = timvf_add_entry_sp(timr, tim[index]->timeout_ticks,
+                               tim[index], &entry);
+               if (unlikely(ret)) {
+                       rte_errno = -ret;
+                       break;
+               }
+       }
+
+       return index;
+}
+
 int
 timvf_timer_reg_burst_mp(const struct rte_event_timer_adapter *adptr,
                struct rte_event_timer **tim, const uint16_t nb_timers)
diff --git a/drivers/event/octeontx/timvf_worker.h 
b/drivers/event/octeontx/timvf_worker.h
index b63dd763c..320eb6ac1 100644
--- a/drivers/event/octeontx/timvf_worker.h
+++ b/drivers/event/octeontx/timvf_worker.h
@@ -160,6 +160,118 @@ timr_clr_bkt(struct timvf_ring *timr, struct 
tim_mem_bucket *bkt)
        return (struct tim_mem_entry *)bkt->first_chunk;
 }
 
+/* Burst mode functions */
+static inline int __hot
+timvf_add_entry_brst(struct timvf_ring *timr, const uint16_t rel_bkt,
+               struct rte_event_timer **tim, const struct tim_mem_entry *ents,
+               const uint16_t nb_timers)
+{
+       int16_t rem;
+       int16_t crem = 0;
+       uint8_t lock_cnt;
+       uint16_t index = 0;
+       uint16_t chunk_remainder = 0;
+       uint32_t bucket;
+       uint32_t tbkt_id;
+       const uint32_t nb_bkts = timr->meta.nb_bkts;
+       const uint64_t start = timr->meta.ring_start_cyc;
+       uint64_t pos_reg;
+       uint64_t lock_sema;
+       struct tim_mem_bucket *bkt;
+       struct tim_mem_entry *chunk;
+
+__retry:
+       pos_reg = (rte_rdtsc() - start);
+       bucket = rte_reciprocal_divide_u64(pos_reg,
+                       &timr->meta.fast_div) + rel_bkt;
+       tbkt_id = timr->meta.get_target_bkt(bucket, nb_bkts);
+       bkt = &timr->meta.bkt[tbkt_id];
+
+       /* Only one thread beyond this. */
+       lock_sema = timr_bkt_inc_lock(bkt);
+       lock_cnt = (uint8_t)
+               ((lock_sema >> TIM_BUCKET_W1_S_LOCK) & TIM_BUCKET_W1_M_LOCK);
+
+       if (lock_cnt) {
+               timr_bkt_dec_lock(bkt);
+               goto __retry;
+       }
+
+       /* Bucket related checks. */
+       if (unlikely(timr_bkt_get_shbt(lock_sema))) {
+               timr_bkt_dec_lock(bkt);
+               goto __retry;
+       }
+
+       chunk_remainder = timr_bkt_fetch_rem(lock_sema);
+       rem = chunk_remainder - nb_timers;
+       if (rem < 0) {
+               crem = nb_chunk_slots - chunk_remainder;
+               if (chunk_remainder && crem) {
+                       chunk = ((struct tim_mem_entry *)bkt->current_chunk) +
+                               crem;
+                       for (; index < chunk_remainder; index++) {
+                               *chunk = *(ents + index);
+                               tim[index]->impl_opaque[0] = (uint64_t)chunk++;
+                               tim[index]->impl_opaque[1] = (uint64_t)bkt;
+                               tim[index]->state = RTE_EVENT_TIMER_ARMED;
+                       }
+                       timr_bkt_sub_rem(bkt, chunk_remainder);
+                       timr_bkt_add_nent(bkt, chunk_remainder);
+               }
+               rem = nb_timers - chunk_remainder;
+               ents = ents + chunk_remainder;
+               if (bkt->nb_entry || !bkt->first_chunk) {
+                       if (unlikely(rte_mempool_get(timr->meta.chunk_pool,
+                                                       (void **)&chunk))) {
+                               /*
+                                * No more chunks, return number of entries
+                                * successfully copied.
+                                */
+                               timr_bkt_dec_lock(bkt);
+                               rte_errno = -ENOMEM;
+                               tim[index]->state = RTE_EVENT_TIMER_ERROR;
+                               return crem;
+                       }
+                       if (bkt->nb_entry) {
+                               *(uint64_t *)(
+                               (struct tim_mem_entry *)bkt->current_chunk +
+                                       nb_chunk_slots) = (uint64_t) chunk;
+                       } else {
+                               bkt->first_chunk = (uint64_t) chunk;
+                       }
+               } else {
+                       chunk = timr_clr_bkt(timr, bkt);
+                       bkt->first_chunk = (uint64_t) chunk;
+               }
+               *(uint64_t *)(chunk + nb_chunk_slots) = 0;
+               bkt->current_chunk = (uint64_t) chunk;
+
+               for (; index < nb_timers; index++) {
+                       *chunk = *(ents + index);
+                       tim[index]->impl_opaque[0] = (uint64_t)chunk++;
+                       tim[index]->impl_opaque[1] = (uint64_t)bkt;
+                       tim[index]->state = RTE_EVENT_TIMER_ARMED;
+               }
+               timr_bkt_set_rem(bkt, nb_chunk_slots - rem);
+               timr_bkt_add_nent(bkt, rem);
+       } else {
+               chunk = (struct tim_mem_entry *)bkt->current_chunk;
+               chunk += (nb_chunk_slots - chunk_remainder);
+               for (; index < nb_timers; index++) {
+                       *chunk = *(ents + index);
+                       tim[index]->impl_opaque[0] = (uint64_t)chunk++;
+                       tim[index]->impl_opaque[1] = (uint64_t)bkt;
+                       tim[index]->state = RTE_EVENT_TIMER_ARMED;
+               }
+               timr_bkt_sub_rem(bkt, nb_timers);
+               timr_bkt_add_nent(bkt, nb_timers);
+       }
+
+       timr_bkt_dec_lock(bkt);
+       return nb_timers;
+}
+
 static inline int __hot
 timvf_rem_entry(struct rte_event_timer *tim)
 {
@@ -192,6 +304,77 @@ timvf_rem_entry(struct rte_event_timer *tim)
        return 0;
 }
 
+/* Single producer functions. */
+static inline int __hot
+timvf_add_entry_sp(struct timvf_ring *timr, const uint32_t rel_bkt,
+               struct rte_event_timer *tim, const struct tim_mem_entry *pent)
+{
+       int16_t rem;
+       uint32_t bucket;
+       uint32_t tbkt_id;
+       const uint32_t nb_bkts = timr->meta.nb_bkts;
+       uint64_t lock_sema;
+       uint64_t pos_reg;
+       const uint64_t start = timr->meta.ring_start_cyc;
+       struct tim_mem_bucket *bkt;
+       struct tim_mem_entry *chunk;
+
+       pos_reg = (rte_rdtsc() - start);
+       bucket = rte_reciprocal_divide_u64(pos_reg,
+                       &timr->meta.fast_div) + rel_bkt;
+       tbkt_id = timr->meta.get_target_bkt(bucket, nb_bkts);
+       bkt = &timr->meta.bkt[tbkt_id];
+__retry:
+       /*Get Bucket sema*/
+       lock_sema = timr_bkt_fetch_sema(bkt);
+       /* Bucket related checks. */
+       if (unlikely(timr_bkt_get_shbt(lock_sema)))
+               goto __retry;
+
+       /* Insert the work. */
+       rem = timr_bkt_fetch_rem(lock_sema);
+
+       if (!rem) {
+               /* SP mode will have only one thread. */
+               if (bkt->nb_entry || !bkt->first_chunk) {
+                       if (unlikely(rte_mempool_get(timr->meta.chunk_pool,
+                                                       (void **)&chunk))) {
+                               timr_bkt_set_rem(bkt, 0);
+                               tim->impl_opaque[0] =
+                                       tim->impl_opaque[1] = 0;
+                               tim->state = RTE_EVENT_TIMER_ERROR;
+                               return -ENOMEM;
+                       }
+                       if (bkt->nb_entry) {
+                               *(uint64_t *)((struct tim_mem_entry *)
+                                               bkt->current_chunk +
+                                               nb_chunk_slots) =
+                                       (uint64_t) chunk;
+                       } else {
+                               bkt->first_chunk = (uint64_t) chunk;
+                       }
+                       *(uint64_t *)(chunk + nb_chunk_slots) = 0;
+               } else {
+                       chunk = timr_clr_bkt(timr, bkt);
+                       *(uint64_t *)(chunk + nb_chunk_slots) = 0;
+                       bkt->first_chunk = (uint64_t) chunk;
+               }
+               bkt->current_chunk = (uint64_t) chunk;
+               timr_bkt_set_rem(bkt, nb_chunk_slots - 1);
+       } else {
+               chunk = (struct tim_mem_entry *)bkt->current_chunk;
+               chunk += nb_chunk_slots - rem;
+       }
+       /* Copy work entry. */
+       *chunk = *pent;
+       timr_bkt_inc_nent(bkt);
+
+       tim->impl_opaque[0] = (uint64_t)chunk;
+       tim->impl_opaque[1] = (uint64_t)bkt;
+       tim->state = RTE_EVENT_TIMER_ARMED;
+       return 0;
+}
+
 /* Multi producer functions. */
 static inline int __hot
 timvf_add_entry_mp(struct timvf_ring *timr, const uint32_t rel_bkt,
-- 
2.16.1

Reply via email to