If an eventdev PMD does not wish to provide event timer adapter ops
definitions, the library will fall back to a default software
implementation whose entry points are added by this commit.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carri...@intel.com>
---
 lib/Makefile                                  |   2 +-
 lib/librte_eventdev/Makefile                  |   2 +-
 lib/librte_eventdev/rte_event_timer_adapter.c | 874 ++++++++++++++++++++++++++
 lib/librte_eventdev/rte_event_timer_adapter.h |  55 ++
 mk/rte.app.mk                                 |   2 +-
 5 files changed, 932 insertions(+), 3 deletions(-)

diff --git a/lib/Makefile b/lib/Makefile
index ec965a6..965be6c 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -31,7 +31,7 @@ DEPDIRS-librte_security := librte_eal librte_mempool 
librte_ring librte_mbuf
 DEPDIRS-librte_security += librte_ether
 DEPDIRS-librte_security += librte_cryptodev
 DIRS-$(CONFIG_RTE_LIBRTE_EVENTDEV) += librte_eventdev
-DEPDIRS-librte_eventdev := librte_eal librte_ring librte_ether librte_hash
+DEPDIRS-librte_eventdev := librte_eal librte_ring librte_ether librte_hash 
librte_mempool librte_timer
 DIRS-$(CONFIG_RTE_LIBRTE_RAWDEV) += librte_rawdev
 DEPDIRS-librte_rawdev := librte_eal librte_ether
 DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost
diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile
index 8b16e3f..297df4a 100644
--- a/lib/librte_eventdev/Makefile
+++ b/lib/librte_eventdev/Makefile
@@ -14,7 +14,7 @@ LIBABIVER := 3
 CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += -O3
 CFLAGS += $(WERROR_FLAGS)
-LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash
+LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool 
-lrte_timer
 
 # library source files
 SRCS-y += rte_eventdev.c
diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c 
b/lib/librte_eventdev/rte_event_timer_adapter.c
index 711d6b9..a35f233 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -4,11 +4,20 @@
  */
 
 #include <string.h>
+#include <stdbool.h>
+#include <sys/queue.h>
 
 #include <rte_memzone.h>
 #include <rte_memory.h>
 #include <rte_dev.h>
 #include <rte_errno.h>
+#include <rte_malloc.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_common.h>
+#include <rte_timer.h>
+#include <rte_service_component.h>
+#include <rte_cycles.h>
 
 #include "rte_eventdev.h"
 #include "rte_eventdev_pmd.h"
@@ -19,9 +28,13 @@
 #define DATA_MZ_NAME_FORMAT "rte_event_timer_adapter_data_%d"
 
 static int evtim_logtype;
+static int evtim_svc_logtype;
+static int evtim_buffer_logtype;
 
 static struct rte_event_timer_adapter 
adapters[RTE_EVENT_TIMER_ADAPTER_NUM_MAX];
 
+const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops;
+
 static inline int
 adapter_valid(const struct rte_event_timer_adapter *adapter)
 {
@@ -38,8 +51,14 @@ adapter_valid(const struct rte_event_timer_adapter *adapter)
 #ifdef RTE_LIBRTE_EVENTDEV_DEBUG
 #define EVTIM_LOG_DBG(...) \
        EVTIM_LOG(DEBUG, evtim_logtype, __VA_ARGS__)
+#define EVTIM_BUF_LOG_DBG(...) \
+       EVTIM_LOG(DEBUG, evtim_buffer_logtype, __VA_ARGS__)
+#define EVTIM_SVC_LOG_DBG(...) \
+       EVTIM_LOG(DEBUG, evtim_svc_logtype, __VA_ARGS__)
 #else
 #define EVTIM_LOG_DBG(...) (void)0
+#define EVTIM_BUF_LOG_DBG(...) (void)0
+#define EVTIM_SVC_LOG_DBG(...) (void)0
 #endif
 
 #define ADAPTER_VALID_OR_ERR_RET(adapter, retval) do { \
@@ -210,6 +229,12 @@ rte_event_timer_adapter_create_ext(
                }
        }
 
+       /* If eventdev PMD did not provide ops, use default software
+        * implementation.
+        */
+       if (adapter->ops == NULL)
+               adapter->ops = &sw_event_adapter_timer_ops;
+
        /* Allow driver to do some setup */
        FUNC_PTR_OR_NULL_RET_WITH_ERRNO(adapter->ops->init, -ENOTSUP);
        ret = adapter->ops->init(adapter);
@@ -327,6 +352,12 @@ rte_event_timer_adapter_lookup(uint16_t adapter_id)
                return NULL;
        }
 
+       /* If eventdev PMD did not provide ops, use default software
+        * implementation.
+        */
+       if (adapter->ops == NULL)
+               adapter->ops = &sw_event_adapter_timer_ops;
+
        /* Set fast-path function pointers */
        adapter->arm_burst = adapter->ops->arm_burst;
        adapter->arm_tmo_tick_burst = adapter->ops->arm_tmo_tick_burst;
@@ -449,6 +480,840 @@ rte_event_timer_cancel_burst(const struct 
rte_event_timer_adapter *adapter,
        return adapter->cancel_burst(adapter, evtims, nb_evtims);
 }
 
+/*
+ * Software event timer adapter buffer helper functions
+ */
+
+#define NSECPERSEC 1E9
+
+/* Optimizations used to index into the buffer require that the buffer size
+ * be a power of 2.
+ */
+#define EVENT_BUFFER_SZ        1024
+
+#define EVENT_BUFFER_BATCHSZ 32
+
+struct event_buffer {
+       uint16_t head;
+       uint16_t tail;
+       struct rte_event events[EVENT_BUFFER_SZ];
+} __rte_cache_aligned;
+
+static inline bool
+event_buffer_full(struct event_buffer *bufp)
+{
+       return (bufp->head - bufp->tail) == EVENT_BUFFER_SZ;
+}
+
+static inline bool
+event_buffer_batch_ready(struct event_buffer *bufp)
+{
+       return (bufp->head - bufp->tail) >= EVENT_BUFFER_BATCHSZ;
+}
+
+static void
+event_buffer_init(struct event_buffer *bufp)
+{
+       bufp->head = bufp->tail = 0;
+       memset(&bufp->events, 0, sizeof(struct rte_event) * EVENT_BUFFER_SZ);
+}
+
+static int
+event_buffer_add(struct event_buffer *bufp, struct rte_event *eventp)
+{
+       uint16_t offset;
+       struct rte_event *buf_eventp;
+
+       if (event_buffer_full(bufp))
+               return -1;
+
+       /* Instead of modulus, bitwise AND with mask to get offset. */
+       offset = bufp->head & (EVENT_BUFFER_SZ - 1);
+       buf_eventp = &bufp->events[offset];
+       rte_memcpy(buf_eventp, eventp, sizeof(struct rte_event));
+
+       /* Wrap automatically when overflow occurs. */
+       bufp->head++;
+
+       return 0;
+}
+
+static void
+event_buffer_flush(struct event_buffer *bufp, uint8_t dev_id, uint8_t port_id,
+                  uint16_t *nb_events_flushed,
+                  uint16_t *nb_events_inv)
+{
+       uint16_t tail_offset, n = 0;
+       uint16_t *tailp = &bufp->tail;
+       uint16_t *headp = &bufp->head;
+       struct rte_event *events = bufp->events;
+
+       /* Instead of modulus, bitwise AND with mask to get offset. */
+       tail_offset = *tailp & (EVENT_BUFFER_SZ - 1);
+
+       /* Determine the largest contigous run we can attempt to enqueue to the
+        * event device.
+        */
+       if (*headp > *tailp)
+               n = *headp - *tailp;
+       else if (*headp < *tailp)
+               n = EVENT_BUFFER_SZ - tail_offset;
+       else {  /* buffer empty */
+               *nb_events_flushed = 0;
+               return;
+       }
+
+       *nb_events_inv = 0;
+       *nb_events_flushed = rte_event_enqueue_burst(dev_id, port_id,
+                                                    &events[tail_offset], n);
+       if (*nb_events_flushed != n && rte_errno == -EINVAL) {
+               EVTIM_LOG_ERR("failed to enqueue invalid event - dropping it");
+               nb_events_inv++;
+       }
+
+       EVTIM_BUF_LOG_DBG("event buffer flush: tried = %"PRIu16", "
+                         "succeeded = %"PRIu16, n, *nb_events_flushed);
+
+       /* Wrap automatically when overflow occurs */
+       *tailp = *tailp + *nb_events_flushed + *nb_events_inv;
+}
+
+/*
+ * Software event timer adapter implementation
+ */
+
+struct rte_event_timer_adapter_sw_data {
+       /* List of messages for outstanding timers */
+       TAILQ_HEAD(, msg) msgs_tailq_head;
+       /* Lock to guard tailq and armed count */
+       rte_spinlock_t msgs_tailq_sl;
+       /* Identifier of service executing timer management logic. */
+       uint32_t service_id;
+       /* The cycle count at which the adapter should next tick */
+       uint64_t next_tick_cycles;
+       /* Ring containing messages to arm or cancel event timers */
+       struct rte_ring *msg_ring;
+       /* Mempool containing msg objects */
+       struct rte_mempool *msg_pool;
+       /* Buffered timer expiry events to be enqueued to an event device. */
+       struct event_buffer buffer;
+       /* Statistics */
+       struct rte_event_timer_adapter_stats stats;
+       /* Incremented as the service moves through phases of an iteration */
+       volatile int service_phase;
+       /* The number of threads currently adding to the message ring */
+       rte_atomic16_t message_producer_count;
+};
+
+enum msg_type {MSG_TYPE_ARM, MSG_TYPE_CANCEL};
+
+struct msg {
+       enum msg_type type;
+       struct rte_event_timer *evtim;
+       struct rte_timer tim;
+       TAILQ_ENTRY(msg) msgs;
+};
+
+static void
+sw_event_timer_cb(struct rte_timer *tim, void *arg)
+{
+       uint16_t nb_evs_flushed, nb_evs_invalid;
+       int ret;
+       struct rte_event_timer *evtim;
+       struct rte_event_timer_adapter *adapter;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+
+       evtim = arg;
+       adapter = (struct rte_event_timer_adapter *)evtim->impl_opaque[1];
+       sw_data = adapter->data->adapter_priv;
+
+       ret = event_buffer_add(&sw_data->buffer, &evtim->ev);
+       if (ret < 0) {
+               /* If event buffer is full, put timer back in list with
+                * immediate expiry value, so that we process it again on the
+                * next iteration.
+                */
+               rte_timer_reset_sync(tim, SINGLE, 0, rte_lcore_id(),
+                                    sw_event_timer_cb, evtim);
+
+               sw_data->stats.evtim_retry_count++;
+               EVTIM_LOG_DBG("event buffer full, resetting rte_timer with "
+                             "immediate expiry value");
+       } else {
+               struct msg *m = container_of(tim, struct msg, tim);
+               TAILQ_REMOVE(&sw_data->msgs_tailq_head, m, msgs);
+               EVTIM_BUF_LOG_DBG("buffered an event timer expiry event");
+               evtim->state = RTE_EVENT_TIMER_NOT_ARMED;
+
+               /* Free the msg object containing the rte_timer now that
+                * we've buffered its event successfully.
+                */
+               rte_mempool_put(sw_data->msg_pool, m);
+       }
+
+       if (event_buffer_batch_ready(&sw_data->buffer)) {
+               event_buffer_flush(&sw_data->buffer,
+                                  adapter->data->event_dev_id,
+                                  adapter->data->event_port_id,
+                                  &nb_evs_flushed,
+                                  &nb_evs_invalid);
+
+               sw_data->stats.ev_enq_count += nb_evs_flushed;
+               sw_data->stats.ev_inv_count += nb_evs_invalid;
+       }
+}
+
+static __rte_always_inline uint64_t
+get_timeout_cycles(struct rte_event_timer *evtim,
+                  struct rte_event_timer_adapter *adapter)
+{
+       uint64_t timeout_ns;
+
+       timeout_ns = evtim->timeout_ticks * adapter->data->conf.timer_tick_ns;
+       return timeout_ns * rte_get_timer_hz() / NSECPERSEC;
+
+}
+
+/* This function returns true if one or more (adapter) ticks have occured since
+ * the last time it was called.
+ */
+static inline bool
+adapter_did_tick(struct rte_event_timer_adapter *adapter)
+{
+       uint64_t cycles_per_adapter_tick, start_cycles;
+       uint64_t *next_tick_cyclesp;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+
+       sw_data = adapter->data->adapter_priv;
+       next_tick_cyclesp = &sw_data->next_tick_cycles;
+
+       cycles_per_adapter_tick = adapter->data->conf.timer_tick_ns *
+                       (rte_get_timer_hz() / NSECPERSEC);
+
+       start_cycles = rte_get_timer_cycles();
+
+       /* Note: initially, *next_tick_cyclesp == 0, so the clause below will
+        * execute, and set things going.
+        */
+
+       if (start_cycles >= *next_tick_cyclesp) {
+               /* Snap the current cycle count to the preceding adapter tick
+                * boundary.
+                */
+               start_cycles -= start_cycles % cycles_per_adapter_tick;
+
+               *next_tick_cyclesp = start_cycles + cycles_per_adapter_tick;
+
+               return true;
+       }
+
+       return false;
+}
+
+/* Check that event timer timeout value is in range */
+static __rte_always_inline int
+check_timeout(struct rte_event_timer *evtim,
+             const struct rte_event_timer_adapter *adapter)
+{
+       uint64_t tmo_nsec = evtim->timeout_ticks *
+               adapter->data->conf.timer_tick_ns;
+
+       if (tmo_nsec > adapter->data->conf.max_tmo_ns)
+               return -1;
+
+       if (tmo_nsec < adapter->data->conf.timer_tick_ns)
+               return -2;
+
+       return 0;
+}
+
+/* Check that event timer event queue sched type matches destination event 
queue
+ * sched type
+ */
+static __rte_always_inline int
+check_destination_event_queue(struct rte_event_timer *evtim,
+                             const struct rte_event_timer_adapter *adapter)
+{
+       int ret;
+       uint32_t sched_type;
+
+       ret = rte_event_queue_attr_get(adapter->data->event_dev_id,
+                                      evtim->ev.queue_id,
+                                      RTE_EVENT_QUEUE_ATTR_SCHEDULE_TYPE,
+                                      &sched_type);
+
+       if ((ret < 0 && ret != -EOVERFLOW) ||
+           evtim->ev.sched_type != sched_type)
+               return -1;
+
+       return 0;
+}
+
+#define NB_OBJS 32
+static int
+sw_event_timer_adapter_service_func(void *arg)
+{
+       int ret, i, num_msgs;
+       uint64_t cycles;
+       uint16_t nb_evs_flushed, nb_evs_invalid;
+       struct rte_event_timer_adapter *adapter;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       struct rte_event_timer *evtim = NULL;
+       struct rte_timer *tim = NULL;
+       struct msg *msg, *msgs[NB_OBJS];
+
+       RTE_SET_USED(ret);
+
+       adapter = arg;
+       sw_data = adapter->data->adapter_priv;
+
+       sw_data->service_phase = 1;
+       rte_smp_wmb();
+
+       while (rte_atomic16_read(&sw_data->message_producer_count) > 0 ||
+              !rte_ring_empty(sw_data->msg_ring)) {
+               num_msgs = rte_ring_dequeue_burst(sw_data->msg_ring,
+                                                 (void **)msgs, NB_OBJS, NULL);
+
+               for (i = 0; i < num_msgs; i++) {
+                       msg = msgs[i];
+                       evtim = msg->evtim;
+
+                       switch (msg->type) {
+                       case MSG_TYPE_ARM:
+                               EVTIM_SVC_LOG_DBG("dequeued ARM message from "
+                                                 "ring");
+                               tim = &msg->tim;
+                               rte_timer_init(tim);
+                               cycles = get_timeout_cycles(evtim,
+                                                           adapter);
+                               ret = rte_timer_reset(tim, cycles, SINGLE,
+                                                     rte_lcore_id(),
+                                                     sw_event_timer_cb,
+                                                     evtim);
+                               RTE_ASSERT(ret == 0);
+
+                               evtim->impl_opaque[0] = (uintptr_t)tim;
+                               evtim->impl_opaque[1] = (uintptr_t)adapter;
+
+                               TAILQ_INSERT_TAIL(&sw_data->msgs_tailq_head,
+                                                 msg,
+                                                 msgs);
+                               break;
+                       case MSG_TYPE_CANCEL:
+                               EVTIM_SVC_LOG_DBG("dequeued CANCEL message "
+                                                 "from ring");
+                               tim = (struct rte_timer *)evtim->impl_opaque[0];
+                               RTE_ASSERT(tim != NULL);
+
+                               ret = rte_timer_stop(tim);
+                               RTE_ASSERT(ret == 0);
+
+                               /* Free the msg object for the original arm
+                                * request.
+                                */
+                               struct msg *m;
+                               m = container_of(tim, struct msg, tim);
+                               TAILQ_REMOVE(&sw_data->msgs_tailq_head, m,
+                                            msgs);
+                               rte_mempool_put(sw_data->msg_pool, m);
+
+                               /* Free the msg object for the current msg */
+                               rte_mempool_put(sw_data->msg_pool, msg);
+
+                               evtim->impl_opaque[0] = 0;
+                               evtim->impl_opaque[1] = 0;
+
+                               break;
+                       }
+               }
+       }
+
+       sw_data->service_phase = 2;
+       rte_smp_wmb();
+
+       if (adapter_did_tick(adapter)) {
+               rte_timer_manage();
+
+               event_buffer_flush(&sw_data->buffer,
+                                  adapter->data->event_dev_id,
+                                  adapter->data->event_port_id,
+                                  &nb_evs_flushed, &nb_evs_invalid);
+
+               sw_data->stats.ev_enq_count += nb_evs_flushed;
+               sw_data->stats.ev_inv_count += nb_evs_invalid;
+               sw_data->stats.adapter_tick_count++;
+       }
+
+       sw_data->service_phase = 0;
+       rte_smp_wmb();
+
+       return 0;
+}
+
+/* The adapter initialization function rounds the mempool size up to the next
+ * power of 2, so we can take the difference between that value and what the
+ * user requested, and use the space for caches.  This avoids a scenario where 
a
+ * user can't arm the number of timers the adapter was configured with because
+ * mempool objects have been lost to caches.
+ *
+ * nb_actual should always be a power of 2, so we can iterate over the powers
+ * of 2 to see what the largest cache size we can use is.
+ */
+static inline int
+compute_msg_mempool_cache_size(uint64_t nb_requested, uint64_t nb_actual) {
+       int i;
+       int size;
+       int cache_size = 0;
+
+       for (i = 0; ; i++) {
+               size = 1 << i;
+
+               if (RTE_MAX_LCORE * size < (int)(nb_actual - nb_requested) &&
+                   size < RTE_MEMPOOL_CACHE_MAX_SIZE &&
+                   size <= nb_actual / 1.5)
+                       cache_size = size;
+               else
+                       break;
+       }
+
+       return cache_size;
+}
+
+static int
+sw_event_timer_adapter_init(struct rte_event_timer_adapter *adapter)
+{
+       int ret;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       uint64_t nb_timers;
+       unsigned int flags;
+       struct rte_service_spec service;
+       static bool timer_subsystem_inited; // static initialized to false
+
+       /* Allocate storage for SW implementation data */
+       char priv_data_name[RTE_RING_NAMESIZE];
+       snprintf(priv_data_name, RTE_RING_NAMESIZE, "sw_evtim_adap_priv_%"PRIu8,
+                adapter->data->id);
+       adapter->data->adapter_priv = rte_zmalloc_socket(
+                               priv_data_name,
+                               sizeof(struct rte_event_timer_adapter_sw_data),
+                               RTE_CACHE_LINE_SIZE,
+                               adapter->data->socket_id);
+       if (adapter->data->adapter_priv == NULL) {
+               EVTIM_LOG_ERR("failed to allocate space for private data");
+               rte_errno = ENOMEM;
+               return -1;
+       }
+
+       sw_data = adapter->data->adapter_priv;
+
+       TAILQ_INIT(&sw_data->msgs_tailq_head);
+       rte_spinlock_init(&sw_data->msgs_tailq_sl);
+       rte_atomic16_init(&sw_data->message_producer_count);
+
+       /* Rings require power of 2, so round up to next such value */
+       nb_timers = rte_align64pow2(adapter->data->conf.nb_timers);
+
+       char msg_ring_name[RTE_RING_NAMESIZE];
+       snprintf(msg_ring_name, RTE_RING_NAMESIZE,
+                "sw_evtim_adap_msg_ring_%"PRIu8, adapter->data->id);
+       flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
+               RING_F_SP_ENQ | RING_F_SC_DEQ :
+               RING_F_SC_DEQ;
+       sw_data->msg_ring = rte_ring_create(msg_ring_name, nb_timers,
+                                           adapter->data->socket_id, flags);
+       if (sw_data->msg_ring == NULL) {
+               EVTIM_LOG_ERR("failed to create message ring");
+               rte_errno = ENOMEM;
+               goto free_priv_data;
+       }
+
+       char pool_name[RTE_RING_NAMESIZE];
+       snprintf(pool_name, RTE_RING_NAMESIZE, "sw_evtim_adap_msg_pool_%"PRIu8,
+                adapter->data->id);
+
+       /* Both the arming/canceling thread and the service thread will do puts
+        * to the mempool, but if the SP_PUT flag is enabled, we can specify
+        * single-consumer get for the mempool.
+        */
+       flags = adapter->data->conf.flags & RTE_EVENT_TIMER_ADAPTER_F_SP_PUT ?
+               MEMPOOL_F_SC_GET : 0;
+
+       /* The usable size of a ring is count - 1, so subtract one here to
+        * make the counts agree.
+        */
+       int pool_size = nb_timers - 1;
+       int cache_size = compute_msg_mempool_cache_size(
+                               adapter->data->conf.nb_timers, nb_timers);
+       sw_data->msg_pool = rte_mempool_create(pool_name, pool_size,
+                                              sizeof(struct msg), cache_size,
+                                              0, NULL, NULL, NULL, NULL,
+                                              adapter->data->socket_id, flags);
+       if (sw_data->msg_pool == NULL) {
+               EVTIM_LOG_ERR("failed to create message object mempool");
+               rte_errno = ENOMEM;
+               goto free_msg_ring;
+       }
+
+       event_buffer_init(&sw_data->buffer);
+
+       /* Register a service component to run adapter logic */
+       memset(&service, 0, sizeof(service));
+       snprintf(service.name, RTE_SERVICE_NAME_MAX,
+                "sw_evimer_adap_svc_%"PRIu8, adapter->data->id);
+       service.socket_id = adapter->data->socket_id;
+       service.callback = sw_event_timer_adapter_service_func;
+       service.callback_userdata = adapter;
+       service.capabilities &= ~(RTE_SERVICE_CAP_MT_SAFE);
+       ret = rte_service_component_register(&service, &sw_data->service_id);
+       if (ret < 0) {
+               EVTIM_LOG_ERR("failed to register service %s with id %"PRIu32
+                             ": err = %d", service.name, sw_data->service_id,
+                             ret);
+
+               rte_errno = ENOSPC;
+               goto free_msg_pool;
+       }
+
+       EVTIM_LOG_DBG("registered service %s with id %"PRIu32, service.name,
+                     sw_data->service_id);
+
+       adapter->data->service_id = sw_data->service_id;
+       adapter->data->service_inited = 1;
+
+       if (!timer_subsystem_inited) {
+               rte_timer_subsystem_init();
+               timer_subsystem_inited = true;
+       }
+
+       return 0;
+
+free_msg_pool:
+       rte_mempool_free(sw_data->msg_pool);
+free_msg_ring:
+       rte_ring_free(sw_data->msg_ring);
+free_priv_data:
+       rte_free(sw_data);
+       return -1;
+}
+
+static int
+sw_event_timer_adapter_uninit(struct rte_event_timer_adapter *adapter)
+{
+       int ret;
+       struct msg *m1, *m2;
+       struct rte_event_timer_adapter_sw_data *sw_data =
+                                               adapter->data->adapter_priv;
+
+       rte_spinlock_lock(&sw_data->msgs_tailq_sl);
+
+       /* Cancel outstanding rte_timers and free msg objects */
+       m1 = TAILQ_FIRST(&sw_data->msgs_tailq_head);
+       while (m1 != NULL) {
+               EVTIM_LOG_DBG("freeing outstanding timer");
+               m2 = TAILQ_NEXT(m1, msgs);
+
+               rte_timer_stop_sync(&m1->tim);
+               rte_mempool_put(sw_data->msg_pool, m1);
+
+               m1 = m2;
+       }
+
+       rte_spinlock_unlock(&sw_data->msgs_tailq_sl);
+
+       ret = rte_service_component_unregister(sw_data->service_id);
+       if (ret < 0) {
+               EVTIM_LOG_ERR("failed to unregister service component");
+               return ret;
+       }
+
+       rte_ring_free(sw_data->msg_ring);
+       rte_mempool_free(sw_data->msg_pool);
+       rte_free(adapter->data->adapter_priv);
+
+       return 0;
+}
+
+static inline int32_t
+get_mapped_count_for_service(uint32_t service_id)
+{
+       int32_t core_count, i, mapped_count = 0;
+       uint32_t lcore_arr[RTE_MAX_LCORE];
+
+       core_count = rte_service_lcore_list(lcore_arr, RTE_MAX_LCORE);
+
+       for (i = 0; i < core_count; i++)
+               if (rte_service_map_lcore_get(service_id, lcore_arr[i]) == 1)
+                       mapped_count++;
+
+       return mapped_count;
+}
+
+static int
+sw_event_timer_adapter_start(const struct rte_event_timer_adapter *adapter)
+{
+       int mapped_count;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+
+       sw_data = adapter->data->adapter_priv;
+
+       /* Mapping the service to more than one service core can introduce
+        * delays while one thread is waiting to acquire a lock, so only allow
+        * one core to be mapped to the service.
+        */
+       mapped_count = get_mapped_count_for_service(sw_data->service_id);
+
+       if (mapped_count == 1)
+               return rte_service_component_runstate_set(sw_data->service_id,
+                                                         1);
+
+       return mapped_count < 1 ? -ENOENT : -ENOTSUP;
+}
+
+static int
+sw_event_timer_adapter_stop(const struct rte_event_timer_adapter *adapter)
+{
+       int ret;
+       struct rte_event_timer_adapter_sw_data *sw_data =
+                                               adapter->data->adapter_priv;
+
+       ret = rte_service_component_runstate_set(sw_data->service_id, 0);
+       if (ret < 0)
+               return ret;
+
+       /* Wait for the service to complete its final iteration before
+        * stopping.
+        */
+       while (sw_data->service_phase != 0)
+               rte_pause();
+
+       rte_smp_rmb();
+
+       return 0;
+}
+
+static void
+sw_event_timer_adapter_get_info(const struct rte_event_timer_adapter *adapter,
+               struct rte_event_timer_adapter_info *adapter_info)
+{
+       RTE_SET_USED(adapter);
+       RTE_SET_USED(adapter_info);
+       /* nothing for now */
+}
+
+static int
+sw_event_timer_adapter_stats_get(const struct rte_event_timer_adapter *adapter,
+                                struct rte_event_timer_adapter_stats *stats)
+{
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       sw_data = adapter->data->adapter_priv;
+       *stats = sw_data->stats;
+       return 0;
+}
+
+static int
+sw_event_timer_adapter_stats_reset(
+                               const struct rte_event_timer_adapter *adapter)
+{
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       sw_data = adapter->data->adapter_priv;
+       memset(&sw_data->stats, 0, sizeof(sw_data->stats));
+       return 0;
+}
+
+static __rte_always_inline int
+__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
+                         struct rte_event_timer **evtims,
+                         uint16_t nb_evtims)
+{
+       int i, ret;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       struct msg *msgs[nb_evtims];
+
+#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
+       /* Check that the service is running. */
+       if (rte_service_runstate_get(adapter->data->service_id) != 1) {
+               rte_errno = EINVAL;
+               return 0;
+       }
+#endif
+
+       sw_data = adapter->data->adapter_priv;
+
+       ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+       if (ret < 0) {
+               rte_errno = ENOSPC;
+               return 0;
+       }
+
+       /* Let the service know we're producing messages for it to process */
+       rte_atomic16_inc(&sw_data->message_producer_count);
+
+       /* If the service is managing timers, wait for it to finish */
+       while (sw_data->service_phase == 2)
+               rte_pause();
+
+       rte_smp_rmb();
+
+       for (i = 0; i < nb_evtims; i++) {
+               /* Don't modify the event timer state in these cases */
+               if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
+                       rte_errno = EALREADY;
+                       break;
+               } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
+                   evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+                       rte_errno = EINVAL;
+                       break;
+               }
+
+               ret = check_timeout(evtims[i], adapter);
+               if (ret == -1) {
+                       evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
+                       rte_errno = EINVAL;
+                       break;
+               }
+               if (ret == -2) {
+                       evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
+                       rte_errno = EINVAL;
+                       break;
+               }
+
+               if (check_destination_event_queue(evtims[i], adapter) < 0) {
+                       evtims[i]->state = RTE_EVENT_TIMER_ERROR;
+                       rte_errno = EINVAL;
+                       break;
+               }
+
+               /* Checks passed, set up a message to enqueue */
+               msgs[i]->type = MSG_TYPE_ARM;
+               msgs[i]->evtim = evtims[i];
+
+               /* msg objects that get enqueued successfully will be freed
+                * either by a future cancel operation or by the timer
+                * expiration callback.
+                */
+               if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
+                       rte_errno = ENOSPC;
+                       break;
+               }
+
+               EVTIM_LOG_DBG("enqueued ARM message to ring");
+
+               evtims[i]->state = RTE_EVENT_TIMER_ARMED;
+       }
+
+       /* Let the service know we're done producing messages */
+       rte_atomic16_dec(&sw_data->message_producer_count);
+
+       if (i < nb_evtims)
+               rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
+                                    nb_evtims - i);
+
+       return i;
+}
+
+static int
+sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
+                        struct rte_event_timer **evtims,
+                        uint16_t nb_evtims)
+{
+       return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+}
+
+static int
+sw_event_timer_cancel_burst(const struct rte_event_timer_adapter *adapter,
+                           struct rte_event_timer **evtims,
+                           uint16_t nb_evtims)
+{
+       int i, ret;
+       struct rte_event_timer_adapter_sw_data *sw_data;
+       struct msg *msgs[nb_evtims];
+
+#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
+       /* Check that the service is running. */
+       if (rte_service_runstate_get(adapter->data->service_id) != 1) {
+               rte_errno = EINVAL;
+               return 0;
+       }
+#endif
+
+       sw_data = adapter->data->adapter_priv;
+
+       ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+       if (ret < 0) {
+               rte_errno = ENOSPC;
+               return 0;
+       }
+
+       /* Let the service know we're producing messages for it to process */
+       rte_atomic16_inc(&sw_data->message_producer_count);
+
+       /* If the service could be modifying event timer states, wait */
+       while (sw_data->service_phase == 2)
+               rte_pause();
+
+       rte_smp_rmb();
+
+       for (i = 0; i < nb_evtims; i++) {
+               /* Don't modify the event timer state in these cases */
+               if (evtims[i]->state == RTE_EVENT_TIMER_CANCELED) {
+                       rte_errno = EALREADY;
+                       break;
+               } else if (evtims[i]->state != RTE_EVENT_TIMER_ARMED) {
+                       rte_errno = EINVAL;
+                       break;
+               }
+
+               msgs[i]->type = MSG_TYPE_CANCEL;
+               msgs[i]->evtim = evtims[i];
+
+               if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
+                       rte_errno = ENOSPC;
+                       break;
+               }
+
+               EVTIM_LOG_DBG("enqueued CANCEL message to ring");
+
+               evtims[i]->state = RTE_EVENT_TIMER_CANCELED;
+       }
+
+       /* Let the service know we're done producing messages */
+       rte_atomic16_dec(&sw_data->message_producer_count);
+
+       if (i < nb_evtims)
+               rte_mempool_put_bulk(sw_data->msg_pool, (void **)&msgs[i],
+                                    nb_evtims - i);
+
+       return i;
+}
+
+static int
+sw_event_timer_arm_tmo_tick_burst(const struct rte_event_timer_adapter 
*adapter,
+                                 struct rte_event_timer **evtims,
+                                 uint64_t timeout_ticks,
+                                 uint16_t nb_evtims)
+{
+       int i;
+
+       for (i = 0; i < nb_evtims; i++)
+               evtims[i]->timeout_ticks = timeout_ticks;
+
+       return __sw_event_timer_arm_burst(adapter, evtims, nb_evtims);
+}
+
+const struct rte_event_timer_adapter_ops sw_event_adapter_timer_ops = {
+       .init = sw_event_timer_adapter_init,
+       .uninit = sw_event_timer_adapter_uninit,
+       .start = sw_event_timer_adapter_start,
+       .stop = sw_event_timer_adapter_stop,
+       .get_info = sw_event_timer_adapter_get_info,
+       .stats_get = sw_event_timer_adapter_stats_get,
+       .stats_reset = sw_event_timer_adapter_stats_reset,
+       .arm_burst = sw_event_timer_arm_burst,
+       .arm_tmo_tick_burst = sw_event_timer_arm_tmo_tick_burst,
+       .cancel_burst = sw_event_timer_cancel_burst,
+};
+
 RTE_INIT(event_timer_adapter_init_log);
 static void
 event_timer_adapter_init_log(void)
@@ -456,4 +1321,13 @@ event_timer_adapter_init_log(void)
        evtim_logtype = rte_log_register("lib.eventdev.adapter.timer");
        if (evtim_logtype >= 0)
                rte_log_set_level(evtim_logtype, RTE_LOG_NOTICE);
+
+       evtim_buffer_logtype = rte_log_register("lib.eventdev.adapter.timer."
+                                               "buffer");
+       if (evtim_buffer_logtype >= 0)
+               rte_log_set_level(evtim_buffer_logtype, RTE_LOG_NOTICE);
+
+       evtim_svc_logtype = rte_log_register("lib.eventdev.adapter.timer.svc");
+       if (evtim_svc_logtype >= 0)
+               rte_log_set_level(evtim_svc_logtype, RTE_LOG_NOTICE);
 }
diff --git a/lib/librte_eventdev/rte_event_timer_adapter.h 
b/lib/librte_eventdev/rte_event_timer_adapter.h
index 1c8a45b..5c223d6 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.h
+++ b/lib/librte_eventdev/rte_event_timer_adapter.h
@@ -335,6 +335,8 @@ rte_event_timer_adapter_get_info(
  *   - 0: Success, adapter started.
  *   - <0: Error code returned by the driver start function.
  *   - -EINVAL if adapter identifier invalid
+ *   - -ENOENT if software adapter but no service core mapped
+ *   - -ENOTSUP if software adapter and more than one service core mapped
  */
 int __rte_experimental
 rte_event_timer_adapter_start(
@@ -461,6 +463,59 @@ int __rte_experimental rte_event_timer_adapter_stats_reset(
                struct rte_event_timer_adapter *adapter);
 
 /**
+ * Retrieve the service ID of the event timer adapter. If the adapter doesn't
+ * use an rte_service function, this function returns -ESRCH.
+ *
+ * @param adapter
+ *   A pointer to an event timer adapter.
+ *
+ * @param [out] service_id
+ *   A pointer to a uint32_t, to be filled in with the service id.
+ *
+ * @return
+ *   - 0: Success
+ *   - <0: Error code on failure, if the event dev doesn't use a rte_service
+ *   function, this function returns -ESRCH.
+ */
+int
+rte_event_timer_adapter_service_id_get(struct rte_event_timer_adapter *adapter,
+                                      uint32_t *service_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Retrieve statistics for an event timer adapter instance.
+ *
+ * @param adapter
+ *   A pointer to an event timer adapter structure.
+ * @param[out] stats
+ *   A pointer to a structure to fill with statistics.
+ *
+ * @return
+ *   - 0: Successfully retrieved.
+ *   - <0: Failure; error code returned.
+ */
+int rte_event_timer_adapter_stats_get(struct rte_event_timer_adapter *adapter,
+                               struct rte_event_timer_adapter_stats *stats);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Reset statistics for an event timer adapter instance.
+ *
+ * @param adapter
+ *   A pointer to an event timer adapter structure.
+ *
+ * @return
+ *   - 0: Successfully reset;
+ *   - <0: Failure; error code returned.
+ */
+int rte_event_timer_adapter_stats_reset(
+                               struct rte_event_timer_adapter *adapter);
+
+/**
  * @warning
  * @b EXPERIMENTAL: this structure may change without prior notice
  *
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 3eb41d1..83660df 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -80,7 +80,6 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_BITRATE)        += 
-lrte_bitratestats
 _LDLIBS-$(CONFIG_RTE_LIBRTE_LATENCY_STATS)  += -lrte_latencystats
 _LDLIBS-$(CONFIG_RTE_LIBRTE_POWER)          += -lrte_power
 
-_LDLIBS-$(CONFIG_RTE_LIBRTE_TIMER)          += -lrte_timer
 _LDLIBS-$(CONFIG_RTE_LIBRTE_EFD)            += -lrte_efd
 
 _LDLIBS-y += --whole-archive
@@ -98,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_CRYPTODEV)      += -lrte_cryptodev
 _LDLIBS-$(CONFIG_RTE_LIBRTE_SECURITY)       += -lrte_security
 _LDLIBS-$(CONFIG_RTE_LIBRTE_EVENTDEV)       += -lrte_eventdev
 _LDLIBS-$(CONFIG_RTE_LIBRTE_RAWDEV)         += -lrte_rawdev
+_LDLIBS-$(CONFIG_RTE_LIBRTE_TIMER)          += -lrte_timer
 _LDLIBS-$(CONFIG_RTE_LIBRTE_MEMPOOL)        += -lrte_mempool
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_RING)   += -lrte_mempool_ring
 _LDLIBS-$(CONFIG_RTE_LIBRTE_RING)           += -lrte_ring
-- 
2.6.4

Reply via email to