On 2021/3/16 8:35, Yunsheng Lin wrote:
> On 2021/3/16 2:53, Jakub Kicinski wrote:
>> On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
>>> @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = {
>>>   */
>>>  struct pfifo_fast_priv {
>>>     struct skb_array q[PFIFO_FAST_BANDS];
>>> +
>>> +   /* protect against data race between enqueue/dequeue and
>>> +    * qdisc->empty setting
>>> +    */
>>> +   spinlock_t lock;
>>>  };
>>>  
>>>  static inline struct skb_array *band2list(struct pfifo_fast_priv *priv,
>>> @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, 
>>> struct Qdisc *qdisc,
>>>     unsigned int pkt_len = qdisc_pkt_len(skb);
>>>     int err;
>>>  
>>> -   err = skb_array_produce(q, skb);
>>> +   spin_lock(&priv->lock);
>>> +   err = __ptr_ring_produce(&q->ring, skb);
>>> +   WRITE_ONCE(qdisc->empty, false);
>>> +   spin_unlock(&priv->lock);
>>>  
>>>     if (unlikely(err)) {
>>>             if (qdisc_is_percpu_stats(qdisc))
>>> @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
>>> *qdisc)
>>>     struct sk_buff *skb = NULL;
>>>     int band;
>>>  
>>> +   spin_lock(&priv->lock);
>>>     for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
>>>             struct skb_array *q = band2list(priv, band);
>>>  
>>> @@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
>>> *qdisc)
>>>     } else {
>>>             WRITE_ONCE(qdisc->empty, true);
>>>     }
>>> +   spin_unlock(&priv->lock);
>>>  
>>>     return skb;
>>>  }
>>
>> I thought pfifo was supposed to be "lockless" and this change
>> re-introduces a lock between producer and consumer, no?
> 
> Yes, the lock breaks the "lockless" of the lockless qdisc for now
> I do not how to solve the below data race locklessly:
> 
>       CPU1:                                   CPU2:
>       dequeue skb                              .
>         .                                      .      
>         .                                 enqueue skb
>         .                                      .
>         .                      WRITE_ONCE(qdisc->empty, false);
>         .                                      .
>         .                                      .
> WRITE_ONCE(qdisc->empty, true);
> 
> If the above happens, the qdisc->empty is true even if the qdisc has some
> skb, which may cuase out of order or packet stuck problem.
> 
> It seems we may need to update ptr_ring' status(empty or not) while
> enqueuing/dequeuing atomically in the ptr_ring implementation.
> 
> Any better idea?

It seems we can use __ptr_ring_empty() within the qdisc->seqlock protection,
because qdisc->seqlock is clearly served as r->consumer_lock.

> 
>>
>> .
>>
> _______________________________________________
> Linuxarm mailing list -- linux...@openeuler.org
> To unsubscribe send an email to linuxarm-le...@openeuler.org
> 

Reply via email to