Ack

On Thu, Apr 17, 2025 at 1:11 PM Liu Kui <kui....@virtuozzo.com> wrote:
>
> It's never a good idea of having to take the mutex lock of kernel rpc
> in the path of ioctl from userspace, because it could block the calling
> thread for very long time, especially when kernel rpc is trying to
> establish a connection.
>
> Currently it needs to take the mutex lock in krpc abort because it
> wants to clean up all aborting msg from kernel rpc which is not easy
> to do without taking the mutex lock. So this approach is perhaps
> unfixable.
>
> So this patch tries a different approach for krpc abort. Two flag
> bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of
> removing aborting msgs from kernel rpc in krp abort, it now only
> set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio
> from accessing associated user buffers. Meanwhile it needs to wait
> if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is
> currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY
> flag bit stays set for long time, thus unlikely blocking krpc
> abort for long time.
>
> Fixes: #VSTOR-104248
> https://virtuozzo.atlassian.net/browse/VSTOR-104248
>
> Signed-off-by: Liu Kui <kui....@virtuozzo.com>
> ---
>  fs/fuse/kio/pcs/pcs_cs.c      |  1 +
>  fs/fuse/kio/pcs/pcs_krpc.c    | 66 ++++++++++++-----------------------
>  fs/fuse/kio/pcs/pcs_rdma_io.c | 14 ++++++++
>  fs/fuse/kio/pcs/pcs_rpc.c     |  2 ++
>  fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++-
>  fs/fuse/kio/pcs/pcs_sock_io.h | 14 ++++++++
>  6 files changed, 88 insertions(+), 44 deletions(-)
>
> diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
> index ad398acb03ef..299ece862cd3 100644
> --- a/fs/fuse/kio/pcs/pcs_cs.c
> +++ b/fs/fuse/kio/pcs/pcs_cs.c
> @@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct 
> pcs_int_request *ireq)
>         pcs_clear_error(&msg->error);
>         msg->done = cs_sent;
>         msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data;
> +       msg->flags = 0;
>
>         if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) {
>                 ireq->error.value = PCS_ERR_CSD_STALE_MAP;
> diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
> index 087d1d966e0d..3e404a2d0bde 100644
> --- a/fs/fuse/kio/pcs/pcs_krpc.c
> +++ b/fs/fuse/kio/pcs/pcs_krpc.c
> @@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct 
> pcs_rpc_hdr *h)
>
>         kreq = msg->private2;
>
> +       set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> +       /* pair with the smp_mb() between set PCS_MSG_ABORTED and test 
> PCS_MSG_BUSY */
> +       smp_mb();
> +
> +       if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
> +               pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
> +               pcs_msg_del_calendar(msg);
> +               list_del(&msg->list);
> +               msg->stage = PCS_MSG_STAGE_DONE;
> +               msg->done(msg);
> +               return NULL;
> +       }
> +
>         resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
>         if (!resp)
>                 return NULL;
> @@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg)
>  {
>         struct krpc_req *kreq = msg->private2;
>
> +       clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
> +
>         if (msg->rpc) {
>                 pcs_rpc_put(msg->rpc);
>                 msg->rpc = NULL;
> @@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq)
>         msg->rpc = NULL;
>         msg->done = pcs_krpc_msg_sent;
>         msg->get_iter = krpc_msg_get_data;
> +       msg->flags = 0;
>
>         spin_lock(&krpc->lock);
>         if (krpc->state != PCS_KRPC_STATE_CONNECTED ||
> @@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
>  {
>         struct krpc_req *kreq, *tmp;
>         struct krpc_completion *comp;
> -       struct pcs_rpc *ep = krpc->rpc;
>         struct pcs_msg *msg;
> -       int timeout = 1000;     /* 10 ms */
>
>         spin_lock(&krpc->lock);
>
> @@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
>         list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
>         spin_unlock(&krpc->lock);
>
> -       /* nothing to be done */
> -       if (list_empty(&krpc->dispose_queue))
> -               return 0;
> -
> -       /* abort incomplete requests */
> -       mutex_lock(&ep->mutex);
>         list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
>                 kreq->flags |= KRPC_REQ_F_ABORTED;
>                 msg = &kreq->msg;
> -               /* if msg is cancelled, kreq will be removed from the queue */
> -               pcs_rpc_cancel_msg(ep, msg);
> +               /*
> +                * The msg isn't freed immediately here however the user 
> buffer
> +                * won't be accessed.
> +                */
> +               pcs_msg_abort(msg, true);
>         }
>
> -       /*
> -        * The krpc->dispose_queue should be empty if there are no requests in
> -        * busy state. Otherwise wait until all busy requests complete. This
> -        * should be a extremely rare case, therefore sleep is acceptable 
> here.
> -        *
> -        * We cannot keep references to busy requests while waiting, because
> -        * busy requests could have been freed.
> -        */
> -       while (!list_empty(&krpc->dispose_queue)) {
> -               kreq = list_first_entry(&krpc->dispose_queue, struct 
> krpc_req, link);
> -               msg = &kreq->msg;
> -
> -               /* no longer busy and cancelled */
> -               if (!pcs_rpc_cancel_msg(ep, msg))
> -                       continue;
> -
> -               /* seems somthing wrong happened to hardware, abort the rpc */
> -               if (timeout == 0) {
> -                       rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
> -                       break;
> -               }
> -               mutex_unlock(&ep->mutex);
> -
> -               /* sleep 10 us */
> -               udelay(10);
> -               timeout--;
> -
> -               /* check again */
> -               mutex_lock(&ep->mutex);
> -       }
> -
> -       mutex_unlock(&ep->mutex);
> -
>         return 0;
>  }
>
> @@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, 
> PCS_NODE_ID_T *id)
>         msg = &connect_req->msg;
>         msg->size = 0;
>         msg->timeout = 0;
> +       msg->flags = 0;
>         msg->rpc = NULL;
>         msg->done = krpc_connect_done;
>         pcs_clear_error(&msg->error);
> diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
> index d50f2c1e97e3..1d9e648d2636 100644
> --- a/fs/fuse/kio/pcs/pcs_rdma_io.c
> +++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
> @@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct 
> pcs_rdmaio *rio)
>  static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct 
> pcs_msg *msg, int done)
>  {
>         if (done) {
> +               clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
>                 pcs_msg_sent(msg);
>                 msg->done(msg);
>         } else {
> @@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct 
> pcs_msg *msg, int type, u64
>         int offset = 0;
>         struct iov_iter it;
>
> +       if (msg) {
> +               set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> +               /* pair with the smp_mb() between set PCS_MSG_ABORTED and 
> test PCS_MSG_BUSY */
> +               smp_mb();
> +
> +               if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
> +                       pcs_set_local_error(&msg->error, 
> PCS_ERR_CANCEL_REQUEST);
> +                       rio_msg_sent(rio, tx, msg, 1);
> +                       return 0;
> +               }
> +       }
> +
>         tx = RE_NULL(rio_get_tx(dev));
>         if (!tx) {
>                 if (allow_again)
> diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
> index 71c2a3b54da7..f15d0c3fb7cd 100644
> --- a/fs/fuse/kio/pcs/pcs_rpc.c
> +++ b/fs/fuse/kio/pcs/pcs_rpc.c
> @@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct 
> pcs_msg * msg, int accou
>         INIT_HLIST_NODE(&msg->kill_link);
>         pcs_rpc_account_msg(ep, msg, account);
>         msg->destructor = pcs_rpc_input_destructor;
> +       msg->flags = 0;
>  }
>
>  struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen)
> @@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg)
>         msg->rpc = NULL;
>         INIT_HLIST_NODE(&msg->kill_link);
>         msg->destructor = pcs_msg_output_destructor;
> +       msg->flags = 0;
>  }
>
>  struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen)
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
> index 805b8f1e56b0..7eb231b7260d 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.c
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.c
> @@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg)
>         }
>  }
>
> +int pcs_msg_abort(struct pcs_msg *msg, bool wait)
> +{
> +       set_bit(PCS_MSG_ABORTED, &msg->flags);
> +
> +       /* pair with the smp_mb() between set PCS_MSG_BUSY and test 
> PCS_MSG_ABORTED */
> +       smp_mb();
> +
> +       while (test_bit(PCS_MSG_BUSY, &msg->flags)) {
> +               if (wait)
> +                       wait_on_bit(&msg->flags, PCS_MSG_BUSY, 
> TASK_INTERRUPTIBLE);
> +               else
> +                       return -EBUSY;
> +       }
> +
> +       return 0;
> +}
> +
>  static void sio_push(struct pcs_sockio * sio)
>  {
>         TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
> @@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
>                         return;
>                 }
>
> +               set_bit(PCS_MSG_BUSY, &msg->flags);
> +
> +               /* pair with the smp_mb() between set PCS_MSG_ABORTED and 
> test PCS_MSG_BUSY */
> +               smp_mb();
> +
> +               /* Shouldn't abort a half sent message  */
> +               if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && 
> !sio->write_offset) {
> +                       pcs_set_local_error(&msg->error, 
> PCS_ERR_CANCEL_REQUEST);
> +                       goto skip_send;
> +               }
> +
>                 /* TODO: cond resched here? */
>                 while (sio->write_offset < msg->size) {
>                         size_t left = msg->size - sio->write_offset;
> @@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
>                                 return;
>                         }
>                 }
> +
> +skip_send:
> +               clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
> +
>                 list_del_init(&msg->list);
>                 sio->write_queue_len -= msg->size;
>
> @@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * 
> sio, int datalen)
>
>         msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO);
>         if (msg) {
> -
>                 pcs_msg_io_init(msg);
>                 pcs_account_msg(sio, msg);
>                 msg->destructor = pcs_msg_input_destructor;
> @@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg)
>                 clone->destructor = pcs_io_msg_output_destructor;
>                 clone->private = msg;
>                 clone->get_iter = get_iter_clone;
> +               clone->flags = 0;
>         }
>         return clone;
>  }
> @@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int 
> copy_len)
>                 clone->_inline_len = (short)copy_len;
>                 memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len);
>                 clone->get_iter = get_iter_cow_clone;
> +               clone->flags = 0;
>         }
>         return clone;
>  }
> diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
> index 09870b38cdad..b14e992a22d3 100644
> --- a/fs/fuse/kio/pcs/pcs_sock_io.h
> +++ b/fs/fuse/kio/pcs/pcs_sock_io.h
> @@ -35,6 +35,17 @@ struct pcs_api_channel
>         unsigned        msg_count;
>  };
>
> +/**
> + * pcs_msg flags
> + *
> + * PCS_MSG_BUSY:               set when the msg is under IO
> + * PCS_MSG_BORTED:  the msg was aborted.
> + */
> +enum pcs_msg_flag {
> +       PCS_MSG_BUSY,
> +       PCS_MSG_ABORTED,
> +};
> +
>  __pre_packed struct pcs_msg
>  {
>         struct __pre_aligned(16) {
> @@ -58,6 +69,8 @@ __pre_packed struct pcs_msg
>                 unsigned char   stage;
>                 abs_time_t      io_start_time;
>
> +               unsigned long flags;
> +
>                 struct hlist_node       kill_link;
>
>                 void            (*get_iter)(struct pcs_msg *, int offset, 
> struct iov_iter *it,
> @@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, 
> struct kvec *vec)
>  }
>  void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
>  void pcs_msg_sent(struct pcs_msg * msg);
> +int pcs_msg_abort(struct pcs_msg *msg, bool wait);
>
>  static inline void * msg_inline_head(struct pcs_msg * msg)
>  {
> --
> 2.39.5 (Apple Git-154)

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to