On 2021/3/16 2:53, Jakub Kicinski wrote:
> On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
>> @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = {
>>   */
>>  struct pfifo_fast_priv {
>>      struct skb_array q[PFIFO_FAST_BANDS];
>> +
>> +    /* protect against data race between enqueue/dequeue and
>> +     * qdisc->empty setting
>> +     */
>> +    spinlock_t lock;
>>  };
>>  
>>  static inline struct skb_array *band2list(struct pfifo_fast_priv *priv,
>> @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, 
>> struct Qdisc *qdisc,
>>      unsigned int pkt_len = qdisc_pkt_len(skb);
>>      int err;
>>  
>> -    err = skb_array_produce(q, skb);
>> +    spin_lock(&priv->lock);
>> +    err = __ptr_ring_produce(&q->ring, skb);
>> +    WRITE_ONCE(qdisc->empty, false);
>> +    spin_unlock(&priv->lock);
>>  
>>      if (unlikely(err)) {
>>              if (qdisc_is_percpu_stats(qdisc))
>> @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
>> *qdisc)
>>      struct sk_buff *skb = NULL;
>>      int band;
>>  
>> +    spin_lock(&priv->lock);
>>      for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
>>              struct skb_array *q = band2list(priv, band);
>>  
>> @@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc 
>> *qdisc)
>>      } else {
>>              WRITE_ONCE(qdisc->empty, true);
>>      }
>> +    spin_unlock(&priv->lock);
>>  
>>      return skb;
>>  }
> 
> I thought pfifo was supposed to be "lockless" and this change
> re-introduces a lock between producer and consumer, no?

Yes, the lock breaks the "lockless" of the lockless qdisc for now
I do not how to solve the below data race locklessly:

        CPU1:                                   CPU2:
      dequeue skb                                .
          .                                      .      
          .                                 enqueue skb
          .                                      .
          .                      WRITE_ONCE(qdisc->empty, false);
          .                                      .
          .                                      .
WRITE_ONCE(qdisc->empty, true);

If the above happens, the qdisc->empty is true even if the qdisc has some
skb, which may cuase out of order or packet stuck problem.

It seems we may need to update ptr_ring' status(empty or not) while
enqueuing/dequeuing atomically in the ptr_ring implementation.

Any better idea?

> 
> .
> 

Reply via email to