On Mon, Nov 10, 2014 at 02:52:47PM +0200, Qinglai Xiao wrote:
> With introduction of in_flight_bitmask, the whole 32 bits of tag can be
> used. Further more, this patch fixed the integer overflow when finding
> the matched tags.
> Note that currently librte_distributor supports up to 64 worker threads.
> If more workers are needed, the size of in_flight_bitmask and the
> algorithm of finding matched tag must be revised.
> 
> Signed-off-by: Qinglai Xiao <jigsaw at gmail.com>
> ---
>  lib/librte_distributor/rte_distributor.c |   45 
> ++++++++++++++++++++++--------
>  lib/librte_distributor/rte_distributor.h |    4 ++
>  2 files changed, 37 insertions(+), 12 deletions(-)
> 
> diff --git a/lib/librte_distributor/rte_distributor.c 
> b/lib/librte_distributor/rte_distributor.c
> index 3dfec4a..3dfccae 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -92,7 +92,13 @@ struct rte_distributor {
>       unsigned num_workers;                 /**< Number of workers polling */
>  
>       uint32_t in_flight_tags[RTE_MAX_LCORE];
> -             /**< Tracks the tag being processed per core, 0 == no pkt */
> +             /**< Tracks the tag being processed per core */
> +     uint64_t in_flight_bitmask;
> +             /**< on/off bits for in-flight tags.
> +              * Note that if RTE_MAX_LCORE is larger than 64 then
> +              * the bitmask has to expand.
> +              */

I would suggest for this that we break the link with RTE_MAX_LCORE. Instead,
we can just enforce a hard limit on the distributor that it can only work
with 64 worker cores. That should avoid any complications.

I would suggest we do a further check in the create function something like
the below:

if (num_workers >= sizeof(d->in_flight_bitmask) * CHAR_BIT) {
        rte_errno = .....
}

> +
>       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
>  
>       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
> @@ -189,6 +195,7 @@ static inline void
>  handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
>  {
>       d->in_flight_tags[wkr] = 0;
> +     d->in_flight_bitmask &= ~(1UL << wkr);
>       d->bufs[wkr].bufptr64 = 0;
>       if (unlikely(d->backlog[wkr].count != 0)) {
>               /* On return of a packet, we need to move the
> @@ -211,7 +218,10 @@ handle_worker_shutdown(struct rte_distributor *d, 
> unsigned wkr)
>                       pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
>                                       RTE_DISTRIB_FLAG_BITS));
>               }
> -             /* recursive call */
> +             /* recursive call.
> +              * Note that the tags were set before first level call
> +              * to rte_distributor_process.
> +              */
>               rte_distributor_process(d, pkts, i);
>               bl->count = bl->start = 0;
>       }
> @@ -242,6 +252,7 @@ process_returns(struct rte_distributor *d)
>                       else {
>                               d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
>                               d->in_flight_tags[wkr] = 0;
> +                             d->in_flight_bitmask &= ~(1UL << wkr);
>                       }
>                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
>               } else if (data & RTE_DISTRIB_RETURN_BUF) {
> @@ -284,14 +295,18 @@ rte_distributor_process(struct rte_distributor *d,
>                       next_value = (((int64_t)(uintptr_t)next_mb)
>                                       << RTE_DISTRIB_FLAG_BITS);
>                       /*
> -                      * Set the low bit on the tag, so we can guarantee that
> -                      * we never store a tag value of zero. That means we can
> -                      * use the zero-value to indicate that no packet is
> -                      * being processed by a worker.
> +                      * User is advocated to set tag vaue for each
> +                      * mbuf before calling rte_distributor_process.
> +                      * User defined tags are used to identify flows,
> +                      * or sessions.
>                        */
> -                     new_tag = (next_mb->hash.usr | 1);
> +                     new_tag = next_mb->hash.usr;
>  
> -                     uint32_t match = 0;
> +                     /*
> +                      * Note that if RTE_MAX_LCORE is larger than 64 then
> +                      * the size of match has to be expanded.
> +                      */
> +                     uint64_t match = 0;
>                       unsigned i;
>                       /*
>                        * to scan for a match use "xor" and "not" to get a 0/1
> @@ -303,9 +318,12 @@ rte_distributor_process(struct rte_distributor *d,
>                               match |= (!(d->in_flight_tags[i] ^ new_tag)
>                                       << i);
>  
> +                     /* Only turned-on bits are considered as match */
> +                     match &= d->in_flight_bitmask;
> +
>                       if (match) {
>                               next_mb = NULL;
> -                             unsigned worker = __builtin_ctz(match);
> +                             unsigned worker = __builtin_ctzl(match);
>                               if (add_to_backlog(&d->backlog[worker],
>                                               next_value) < 0)
>                                       next_idx--;
> @@ -322,6 +340,7 @@ rte_distributor_process(struct rte_distributor *d,
>                       else {
>                               d->bufs[wkr].bufptr64 = next_value;
>                               d->in_flight_tags[wkr] = new_tag;
> +                             d->in_flight_bitmask |= (1UL << wkr);
>                               next_mb = NULL;
>                       }
>                       oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
> @@ -379,11 +398,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
>  static inline unsigned
>  total_outstanding(const struct rte_distributor *d)
>  {
> -     unsigned wkr, total_outstanding = 0;
> +     unsigned wkr, total_outstanding;
> +
> +     total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
>  
>       for (wkr = 0; wkr < d->num_workers; wkr++)
> -             total_outstanding += d->backlog[wkr].count +
> -                             !!(d->in_flight_tags[wkr]);
> +             total_outstanding += d->backlog[wkr].count;
> +
>       return total_outstanding;
>  }
>  
> diff --git a/lib/librte_distributor/rte_distributor.h 
> b/lib/librte_distributor/rte_distributor.h
> index ec0d74a..cc1d559 100644
> --- a/lib/librte_distributor/rte_distributor.h
> +++ b/lib/librte_distributor/rte_distributor.h
> @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned 
> socket_id,
>   * packets. The distributor will ensure that no two packets that have the
>   * same flow id, or tag, in the mbuf will be procesed at the same time.
>   *
> + * The user is advocated to set tag for each mbuf before calling this 
> function.
> + * If user doesn't set the tag, the tag value can be various values 
> depending on
> + * driver implementation and configuration.
> + *
>   * This is not multi-thread safe and should only be called on a single lcore.
>   *
>   * @param d
> -- 
> 1.7.1
> 

Reply via email to