The commit is pushed to "branch-rh9-5.14.0-427.44.1.vz9.80.x-ovz" and will 
appear at g...@bitbucket.org:openvz/vzkernel.git
after rh9-5.14.0-427.44.1.vz9.80.26
------>
commit 8ebd11fbbc4a5a29b7cc9ee56b5bef1e141cf0a3
Author: Liu Kui <kui....@virtuozzo.com>
Date:   Thu Apr 17 12:57:23 2025 +0800

    fs/fuse kio: fix krpc abort
    
    It's never a good idea of having to take the mutex lock of kernel rpc
    in the path of ioctl from userspace, because it could block the calling
    thread for very long time, especially when kernel rpc is trying to
    establish a connection.
    
    Currently it needs to take the mutex lock in krpc abort because it
    wants to clean up all aborting msg from kernel rpc which is not easy
    to do without taking the mutex lock. So this approach is perhaps
    unfixable.
    
    So this patch tries a different approach for krpc abort. Two flag
    bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of
    removing aborting msgs from kernel rpc in krp abort, it now only
    set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio
    from accessing associated user buffers. Meanwhile it needs to wait
    if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is
    currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY
    flag bit stays set for long time, thus unlikely blocking krpc
    abort for long time.
    
    Fixes: #VSTOR-104248
    https://virtuozzo.atlassian.net/browse/VSTOR-104248
    
    Signed-off-by: Liu Kui <kui....@virtuozzo.com>
    Acked-by: Alexey Kuznetsov <kuz...@virtuozzo.com>
    
    Feature: fuse: kRPC - single RPC for kernel and userspace
---
 fs/fuse/kio/pcs/pcs_cs.c      |  1 +
 fs/fuse/kio/pcs/pcs_krpc.c    | 66 +++++++++++++++----------------------------
 fs/fuse/kio/pcs/pcs_rdma_io.c | 14 +++++++++
 fs/fuse/kio/pcs/pcs_rpc.c     |  2 ++
 fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++++++-
 fs/fuse/kio/pcs/pcs_sock_io.h | 14 +++++++++
 6 files changed, 88 insertions(+), 44 deletions(-)

diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c
index ad398acb03ef..299ece862cd3 100644
--- a/fs/fuse/kio/pcs/pcs_cs.c
+++ b/fs/fuse/kio/pcs/pcs_cs.c
@@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct 
pcs_int_request *ireq)
        pcs_clear_error(&msg->error);
        msg->done = cs_sent;
        msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data;
+       msg->flags = 0;
 
        if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) {
                ireq->error.value = PCS_ERR_CSD_STALE_MAP;
diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c
index 087d1d966e0d..3e404a2d0bde 100644
--- a/fs/fuse/kio/pcs/pcs_krpc.c
+++ b/fs/fuse/kio/pcs/pcs_krpc.c
@@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct 
pcs_rpc_hdr *h)
 
        kreq = msg->private2;
 
+       set_bit(PCS_MSG_BUSY, &msg->flags);
+
+       /* pair with the smp_mb() between set PCS_MSG_ABORTED and test 
PCS_MSG_BUSY */
+       smp_mb();
+
+       if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
+               pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST);
+               pcs_msg_del_calendar(msg);
+               list_del(&msg->list);
+               msg->stage = PCS_MSG_STAGE_DONE;
+               msg->done(msg);
+               return NULL;
+       }
+
        resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr));
        if (!resp)
                return NULL;
@@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg)
 {
        struct krpc_req *kreq = msg->private2;
 
+       clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
+
        if (msg->rpc) {
                pcs_rpc_put(msg->rpc);
                msg->rpc = NULL;
@@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq)
        msg->rpc = NULL;
        msg->done = pcs_krpc_msg_sent;
        msg->get_iter = krpc_msg_get_data;
+       msg->flags = 0;
 
        spin_lock(&krpc->lock);
        if (krpc->state != PCS_KRPC_STATE_CONNECTED ||
@@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
 {
        struct krpc_req *kreq, *tmp;
        struct krpc_completion *comp;
-       struct pcs_rpc *ep = krpc->rpc;
        struct pcs_msg *msg;
-       int timeout = 1000;     /* 10 ms */
 
        spin_lock(&krpc->lock);
 
@@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc)
        list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue);
        spin_unlock(&krpc->lock);
 
-       /* nothing to be done */
-       if (list_empty(&krpc->dispose_queue))
-               return 0;
-
-       /* abort incomplete requests */
-       mutex_lock(&ep->mutex);
        list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) {
                kreq->flags |= KRPC_REQ_F_ABORTED;
                msg = &kreq->msg;
-               /* if msg is cancelled, kreq will be removed from the queue */
-               pcs_rpc_cancel_msg(ep, msg);
+               /*
+                * The msg isn't freed immediately here however the user buffer
+                * won't be accessed.
+                */
+               pcs_msg_abort(msg, true);
        }
 
-       /*
-        * The krpc->dispose_queue should be empty if there are no requests in
-        * busy state. Otherwise wait until all busy requests complete. This
-        * should be a extremely rare case, therefore sleep is acceptable here.
-        *
-        * We cannot keep references to busy requests while waiting, because
-        * busy requests could have been freed.
-        */
-       while (!list_empty(&krpc->dispose_queue)) {
-               kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, 
link);
-               msg = &kreq->msg;
-
-               /* no longer busy and cancelled */
-               if (!pcs_rpc_cancel_msg(ep, msg))
-                       continue;
-
-               /* seems somthing wrong happened to hardware, abort the rpc */
-               if (timeout == 0) {
-                       rpc_abort(ep, 0, PCS_ERR_NET_ABORT);
-                       break;
-               }
-               mutex_unlock(&ep->mutex);
-
-               /* sleep 10 us */
-               udelay(10);
-               timeout--;
-
-               /* check again */
-               mutex_lock(&ep->mutex);
-       }
-
-       mutex_unlock(&ep->mutex);
-
        return 0;
 }
 
@@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, 
PCS_NODE_ID_T *id)
        msg = &connect_req->msg;
        msg->size = 0;
        msg->timeout = 0;
+       msg->flags = 0;
        msg->rpc = NULL;
        msg->done = krpc_connect_done;
        pcs_clear_error(&msg->error);
diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c
index d50f2c1e97e3..1d9e648d2636 100644
--- a/fs/fuse/kio/pcs/pcs_rdma_io.c
+++ b/fs/fuse/kio/pcs/pcs_rdma_io.c
@@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct 
pcs_rdmaio *rio)
 static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct 
pcs_msg *msg, int done)
 {
        if (done) {
+               clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
                pcs_msg_sent(msg);
                msg->done(msg);
        } else {
@@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct 
pcs_msg *msg, int type, u64
        int offset = 0;
        struct iov_iter it;
 
+       if (msg) {
+               set_bit(PCS_MSG_BUSY, &msg->flags);
+
+               /* pair with the smp_mb() between set PCS_MSG_ABORTED and test 
PCS_MSG_BUSY */
+               smp_mb();
+
+               if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) {
+                       pcs_set_local_error(&msg->error, 
PCS_ERR_CANCEL_REQUEST);
+                       rio_msg_sent(rio, tx, msg, 1);
+                       return 0;
+               }
+       }
+
        tx = RE_NULL(rio_get_tx(dev));
        if (!tx) {
                if (allow_again)
diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c
index 71c2a3b54da7..f15d0c3fb7cd 100644
--- a/fs/fuse/kio/pcs/pcs_rpc.c
+++ b/fs/fuse/kio/pcs/pcs_rpc.c
@@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct 
pcs_msg * msg, int accou
        INIT_HLIST_NODE(&msg->kill_link);
        pcs_rpc_account_msg(ep, msg, account);
        msg->destructor = pcs_rpc_input_destructor;
+       msg->flags = 0;
 }
 
 struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen)
@@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg)
        msg->rpc = NULL;
        INIT_HLIST_NODE(&msg->kill_link);
        msg->destructor = pcs_msg_output_destructor;
+       msg->flags = 0;
 }
 
 struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen)
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c
index 805b8f1e56b0..7eb231b7260d 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.c
+++ b/fs/fuse/kio/pcs/pcs_sock_io.c
@@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg)
        }
 }
 
+int pcs_msg_abort(struct pcs_msg *msg, bool wait)
+{
+       set_bit(PCS_MSG_ABORTED, &msg->flags);
+
+       /* pair with the smp_mb() between set PCS_MSG_BUSY and test 
PCS_MSG_ABORTED */
+       smp_mb();
+
+       while (test_bit(PCS_MSG_BUSY, &msg->flags)) {
+               if (wait)
+                       wait_on_bit(&msg->flags, PCS_MSG_BUSY, 
TASK_INTERRUPTIBLE);
+               else
+                       return -EBUSY;
+       }
+
+       return 0;
+}
+
 static void sio_push(struct pcs_sockio * sio)
 {
        TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent));
@@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
                        return;
                }
 
+               set_bit(PCS_MSG_BUSY, &msg->flags);
+
+               /* pair with the smp_mb() between set PCS_MSG_ABORTED and test 
PCS_MSG_BUSY */
+               smp_mb();
+
+               /* Shouldn't abort a half sent message  */
+               if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && 
!sio->write_offset) {
+                       pcs_set_local_error(&msg->error, 
PCS_ERR_CANCEL_REQUEST);
+                       goto skip_send;
+               }
+
                /* TODO: cond resched here? */
                while (sio->write_offset < msg->size) {
                        size_t left = msg->size - sio->write_offset;
@@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio)
                                return;
                        }
                }
+
+skip_send:
+               clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags);
+
                list_del_init(&msg->list);
                sio->write_queue_len -= msg->size;
 
@@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * 
sio, int datalen)
 
        msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO);
        if (msg) {
-
                pcs_msg_io_init(msg);
                pcs_account_msg(sio, msg);
                msg->destructor = pcs_msg_input_destructor;
@@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg)
                clone->destructor = pcs_io_msg_output_destructor;
                clone->private = msg;
                clone->get_iter = get_iter_clone;
+               clone->flags = 0;
        }
        return clone;
 }
@@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int 
copy_len)
                clone->_inline_len = (short)copy_len;
                memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len);
                clone->get_iter = get_iter_cow_clone;
+               clone->flags = 0;
        }
        return clone;
 }
diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h
index 09870b38cdad..b14e992a22d3 100644
--- a/fs/fuse/kio/pcs/pcs_sock_io.h
+++ b/fs/fuse/kio/pcs/pcs_sock_io.h
@@ -35,6 +35,17 @@ struct pcs_api_channel
        unsigned        msg_count;
 };
 
+/**
+ * pcs_msg flags
+ *
+ * PCS_MSG_BUSY:               set when the msg is under IO
+ * PCS_MSG_BORTED:  the msg was aborted.
+ */
+enum pcs_msg_flag {
+       PCS_MSG_BUSY,
+       PCS_MSG_ABORTED,
+};
+
 __pre_packed struct pcs_msg
 {
        struct __pre_aligned(16) {
@@ -58,6 +69,8 @@ __pre_packed struct pcs_msg
                unsigned char   stage;
                abs_time_t      io_start_time;
 
+               unsigned long flags;
+
                struct hlist_node       kill_link;
 
                void            (*get_iter)(struct pcs_msg *, int offset, 
struct iov_iter *it,
@@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, 
struct kvec *vec)
 }
 void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn);
 void pcs_msg_sent(struct pcs_msg * msg);
+int pcs_msg_abort(struct pcs_msg *msg, bool wait);
 
 static inline void * msg_inline_head(struct pcs_msg * msg)
 {
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to