The commit is pushed to "branch-rh9-5.14.0-427.44.1.vz9.80.x-ovz" and will appear at g...@bitbucket.org:openvz/vzkernel.git after rh9-5.14.0-427.44.1.vz9.80.26 ------> commit 8ebd11fbbc4a5a29b7cc9ee56b5bef1e141cf0a3 Author: Liu Kui <kui....@virtuozzo.com> Date: Thu Apr 17 12:57:23 2025 +0800
fs/fuse kio: fix krpc abort It's never a good idea of having to take the mutex lock of kernel rpc in the path of ioctl from userspace, because it could block the calling thread for very long time, especially when kernel rpc is trying to establish a connection. Currently it needs to take the mutex lock in krpc abort because it wants to clean up all aborting msg from kernel rpc which is not easy to do without taking the mutex lock. So this approach is perhaps unfixable. So this patch tries a different approach for krpc abort. Two flag bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of removing aborting msgs from kernel rpc in krp abort, it now only set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio from accessing associated user buffers. Meanwhile it needs to wait if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY flag bit stays set for long time, thus unlikely blocking krpc abort for long time. Fixes: #VSTOR-104248 https://virtuozzo.atlassian.net/browse/VSTOR-104248 Signed-off-by: Liu Kui <kui....@virtuozzo.com> Acked-by: Alexey Kuznetsov <kuz...@virtuozzo.com> Feature: fuse: kRPC - single RPC for kernel and userspace --- fs/fuse/kio/pcs/pcs_cs.c | 1 + fs/fuse/kio/pcs/pcs_krpc.c | 66 +++++++++++++++---------------------------- fs/fuse/kio/pcs/pcs_rdma_io.c | 14 +++++++++ fs/fuse/kio/pcs/pcs_rpc.c | 2 ++ fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++++++- fs/fuse/kio/pcs/pcs_sock_io.h | 14 +++++++++ 6 files changed, 88 insertions(+), 44 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c index ad398acb03ef..299ece862cd3 100644 --- a/fs/fuse/kio/pcs/pcs_cs.c +++ b/fs/fuse/kio/pcs/pcs_cs.c @@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct pcs_int_request *ireq) pcs_clear_error(&msg->error); msg->done = cs_sent; msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data; + msg->flags = 0; if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) { ireq->error.value = PCS_ERR_CSD_STALE_MAP; diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c index 087d1d966e0d..3e404a2d0bde 100644 --- a/fs/fuse/kio/pcs/pcs_krpc.c +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct pcs_rpc_hdr *h) kreq = msg->private2; + set_bit(PCS_MSG_BUSY, &msg->flags); + + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */ + smp_mb(); + + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) { + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST); + pcs_msg_del_calendar(msg); + list_del(&msg->list); + msg->stage = PCS_MSG_STAGE_DONE; + msg->done(msg); + return NULL; + } + resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr)); if (!resp) return NULL; @@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg) { struct krpc_req *kreq = msg->private2; + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); + if (msg->rpc) { pcs_rpc_put(msg->rpc); msg->rpc = NULL; @@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq) msg->rpc = NULL; msg->done = pcs_krpc_msg_sent; msg->get_iter = krpc_msg_get_data; + msg->flags = 0; spin_lock(&krpc->lock); if (krpc->state != PCS_KRPC_STATE_CONNECTED || @@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) { struct krpc_req *kreq, *tmp; struct krpc_completion *comp; - struct pcs_rpc *ep = krpc->rpc; struct pcs_msg *msg; - int timeout = 1000; /* 10 ms */ spin_lock(&krpc->lock); @@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); spin_unlock(&krpc->lock); - /* nothing to be done */ - if (list_empty(&krpc->dispose_queue)) - return 0; - - /* abort incomplete requests */ - mutex_lock(&ep->mutex); list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) { kreq->flags |= KRPC_REQ_F_ABORTED; msg = &kreq->msg; - /* if msg is cancelled, kreq will be removed from the queue */ - pcs_rpc_cancel_msg(ep, msg); + /* + * The msg isn't freed immediately here however the user buffer + * won't be accessed. + */ + pcs_msg_abort(msg, true); } - /* - * The krpc->dispose_queue should be empty if there are no requests in - * busy state. Otherwise wait until all busy requests complete. This - * should be a extremely rare case, therefore sleep is acceptable here. - * - * We cannot keep references to busy requests while waiting, because - * busy requests could have been freed. - */ - while (!list_empty(&krpc->dispose_queue)) { - kreq = list_first_entry(&krpc->dispose_queue, struct krpc_req, link); - msg = &kreq->msg; - - /* no longer busy and cancelled */ - if (!pcs_rpc_cancel_msg(ep, msg)) - continue; - - /* seems somthing wrong happened to hardware, abort the rpc */ - if (timeout == 0) { - rpc_abort(ep, 0, PCS_ERR_NET_ABORT); - break; - } - mutex_unlock(&ep->mutex); - - /* sleep 10 us */ - udelay(10); - timeout--; - - /* check again */ - mutex_lock(&ep->mutex); - } - - mutex_unlock(&ep->mutex); - return 0; } @@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, PCS_NODE_ID_T *id) msg = &connect_req->msg; msg->size = 0; msg->timeout = 0; + msg->flags = 0; msg->rpc = NULL; msg->done = krpc_connect_done; pcs_clear_error(&msg->error); diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c index d50f2c1e97e3..1d9e648d2636 100644 --- a/fs/fuse/kio/pcs/pcs_rdma_io.c +++ b/fs/fuse/kio/pcs/pcs_rdma_io.c @@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct pcs_rdmaio *rio) static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct pcs_msg *msg, int done) { if (done) { + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); pcs_msg_sent(msg); msg->done(msg); } else { @@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct pcs_msg *msg, int type, u64 int offset = 0; struct iov_iter it; + if (msg) { + set_bit(PCS_MSG_BUSY, &msg->flags); + + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */ + smp_mb(); + + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) { + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST); + rio_msg_sent(rio, tx, msg, 1); + return 0; + } + } + tx = RE_NULL(rio_get_tx(dev)); if (!tx) { if (allow_again) diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c index 71c2a3b54da7..f15d0c3fb7cd 100644 --- a/fs/fuse/kio/pcs/pcs_rpc.c +++ b/fs/fuse/kio/pcs/pcs_rpc.c @@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct pcs_msg * msg, int accou INIT_HLIST_NODE(&msg->kill_link); pcs_rpc_account_msg(ep, msg, account); msg->destructor = pcs_rpc_input_destructor; + msg->flags = 0; } struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen) @@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg) msg->rpc = NULL; INIT_HLIST_NODE(&msg->kill_link); msg->destructor = pcs_msg_output_destructor; + msg->flags = 0; } struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen) diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c index 805b8f1e56b0..7eb231b7260d 100644 --- a/fs/fuse/kio/pcs/pcs_sock_io.c +++ b/fs/fuse/kio/pcs/pcs_sock_io.c @@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg) } } +int pcs_msg_abort(struct pcs_msg *msg, bool wait) +{ + set_bit(PCS_MSG_ABORTED, &msg->flags); + + /* pair with the smp_mb() between set PCS_MSG_BUSY and test PCS_MSG_ABORTED */ + smp_mb(); + + while (test_bit(PCS_MSG_BUSY, &msg->flags)) { + if (wait) + wait_on_bit(&msg->flags, PCS_MSG_BUSY, TASK_INTERRUPTIBLE); + else + return -EBUSY; + } + + return 0; +} + static void sio_push(struct pcs_sockio * sio) { TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent)); @@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio) return; } + set_bit(PCS_MSG_BUSY, &msg->flags); + + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test PCS_MSG_BUSY */ + smp_mb(); + + /* Shouldn't abort a half sent message */ + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && !sio->write_offset) { + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST); + goto skip_send; + } + /* TODO: cond resched here? */ while (sio->write_offset < msg->size) { size_t left = msg->size - sio->write_offset; @@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio) return; } } + +skip_send: + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); + list_del_init(&msg->list); sio->write_queue_len -= msg->size; @@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * sio, int datalen) msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO); if (msg) { - pcs_msg_io_init(msg); pcs_account_msg(sio, msg); msg->destructor = pcs_msg_input_destructor; @@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg) clone->destructor = pcs_io_msg_output_destructor; clone->private = msg; clone->get_iter = get_iter_clone; + clone->flags = 0; } return clone; } @@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int copy_len) clone->_inline_len = (short)copy_len; memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len); clone->get_iter = get_iter_cow_clone; + clone->flags = 0; } return clone; } diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h index 09870b38cdad..b14e992a22d3 100644 --- a/fs/fuse/kio/pcs/pcs_sock_io.h +++ b/fs/fuse/kio/pcs/pcs_sock_io.h @@ -35,6 +35,17 @@ struct pcs_api_channel unsigned msg_count; }; +/** + * pcs_msg flags + * + * PCS_MSG_BUSY: set when the msg is under IO + * PCS_MSG_BORTED: the msg was aborted. + */ +enum pcs_msg_flag { + PCS_MSG_BUSY, + PCS_MSG_ABORTED, +}; + __pre_packed struct pcs_msg { struct __pre_aligned(16) { @@ -58,6 +69,8 @@ __pre_packed struct pcs_msg unsigned char stage; abs_time_t io_start_time; + unsigned long flags; + struct hlist_node kill_link; void (*get_iter)(struct pcs_msg *, int offset, struct iov_iter *it, @@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, struct kvec *vec) } void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn); void pcs_msg_sent(struct pcs_msg * msg); +int pcs_msg_abort(struct pcs_msg *msg, bool wait); static inline void * msg_inline_head(struct pcs_msg * msg) { _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel