Ack On Thu, Apr 17, 2025 at 1:11 PM Liu Kui <kui....@virtuozzo.com> wrote: > > It's never a good idea of having to take the mutex lock of kernel rpc > in the path of ioctl from userspace, because it could block the calling > thread for very long time, especially when kernel rpc is trying to > establish a connection. > > Currently it needs to take the mutex lock in krpc abort because it > wants to clean up all aborting msg from kernel rpc which is not easy > to do without taking the mutex lock. So this approach is perhaps > unfixable. > > So this patch tries a different approach for krpc abort. Two flag > bits PCS_MSG_BUSY and PCS_MSG_ABORT are introduced. Instead of > removing aborting msgs from kernel rpc in krp abort, it now only > set the PCS_MSG_ABORT flag bit to aborting msg, preventing netio > from accessing associated user buffers. Meanwhile it needs to wait > if the PCS_MSG_BUSY flag bit is set, meaning the aborting msg is > currently under I/O by netio. It's very unlikely that the PCS_MSG_BUSY > flag bit stays set for long time, thus unlikely blocking krpc > abort for long time. > > Fixes: #VSTOR-104248 > https://virtuozzo.atlassian.net/browse/VSTOR-104248 > > Signed-off-by: Liu Kui <kui....@virtuozzo.com> > --- > fs/fuse/kio/pcs/pcs_cs.c | 1 + > fs/fuse/kio/pcs/pcs_krpc.c | 66 ++++++++++++----------------------- > fs/fuse/kio/pcs/pcs_rdma_io.c | 14 ++++++++ > fs/fuse/kio/pcs/pcs_rpc.c | 2 ++ > fs/fuse/kio/pcs/pcs_sock_io.c | 35 ++++++++++++++++++- > fs/fuse/kio/pcs/pcs_sock_io.h | 14 ++++++++ > 6 files changed, 88 insertions(+), 44 deletions(-) > > diff --git a/fs/fuse/kio/pcs/pcs_cs.c b/fs/fuse/kio/pcs/pcs_cs.c > index ad398acb03ef..299ece862cd3 100644 > --- a/fs/fuse/kio/pcs/pcs_cs.c > +++ b/fs/fuse/kio/pcs/pcs_cs.c > @@ -715,6 +715,7 @@ static void do_cs_submit(struct pcs_cs *cs, struct > pcs_int_request *ireq) > pcs_clear_error(&msg->error); > msg->done = cs_sent; > msg->get_iter = aligned_msg ? cs_get_data_aligned : cs_get_data; > + msg->flags = 0; > > if ((map->state & PCS_MAP_DEAD) || (map->cs_list != csl)) { > ireq->error.value = PCS_ERR_CSD_STALE_MAP; > diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c > index 087d1d966e0d..3e404a2d0bde 100644 > --- a/fs/fuse/kio/pcs/pcs_krpc.c > +++ b/fs/fuse/kio/pcs/pcs_krpc.c > @@ -155,6 +155,20 @@ struct pcs_msg *krpc_get_hdr(struct pcs_rpc *ep, struct > pcs_rpc_hdr *h) > > kreq = msg->private2; > > + set_bit(PCS_MSG_BUSY, &msg->flags); > + > + /* pair with the smp_mb() between set PCS_MSG_ABORTED and test > PCS_MSG_BUSY */ > + smp_mb(); > + > + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) { > + pcs_set_local_error(&msg->error, PCS_ERR_CANCEL_REQUEST); > + pcs_msg_del_calendar(msg); > + list_del(&msg->list); > + msg->stage = PCS_MSG_STAGE_DONE; > + msg->done(msg); > + return NULL; > + } > + > resp = pcs_rpc_alloc_input_msg(ep, sizeof(struct pcs_rpc_hdr)); > if (!resp) > return NULL; > @@ -256,6 +270,8 @@ void pcs_krpc_response_done(struct pcs_msg *msg) > { > struct krpc_req *kreq = msg->private2; > > + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); > + > if (msg->rpc) { > pcs_rpc_put(msg->rpc); > msg->rpc = NULL; > @@ -579,6 +595,7 @@ static int kreq_make_sendmsg(struct krpc_req *kreq) > msg->rpc = NULL; > msg->done = pcs_krpc_msg_sent; > msg->get_iter = krpc_msg_get_data; > + msg->flags = 0; > > spin_lock(&krpc->lock); > if (krpc->state != PCS_KRPC_STATE_CONNECTED || > @@ -711,9 +728,7 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) > { > struct krpc_req *kreq, *tmp; > struct krpc_completion *comp; > - struct pcs_rpc *ep = krpc->rpc; > struct pcs_msg *msg; > - int timeout = 1000; /* 10 ms */ > > spin_lock(&krpc->lock); > > @@ -744,52 +759,16 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) > list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); > spin_unlock(&krpc->lock); > > - /* nothing to be done */ > - if (list_empty(&krpc->dispose_queue)) > - return 0; > - > - /* abort incomplete requests */ > - mutex_lock(&ep->mutex); > list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) { > kreq->flags |= KRPC_REQ_F_ABORTED; > msg = &kreq->msg; > - /* if msg is cancelled, kreq will be removed from the queue */ > - pcs_rpc_cancel_msg(ep, msg); > + /* > + * The msg isn't freed immediately here however the user > buffer > + * won't be accessed. > + */ > + pcs_msg_abort(msg, true); > } > > - /* > - * The krpc->dispose_queue should be empty if there are no requests in > - * busy state. Otherwise wait until all busy requests complete. This > - * should be a extremely rare case, therefore sleep is acceptable > here. > - * > - * We cannot keep references to busy requests while waiting, because > - * busy requests could have been freed. > - */ > - while (!list_empty(&krpc->dispose_queue)) { > - kreq = list_first_entry(&krpc->dispose_queue, struct > krpc_req, link); > - msg = &kreq->msg; > - > - /* no longer busy and cancelled */ > - if (!pcs_rpc_cancel_msg(ep, msg)) > - continue; > - > - /* seems somthing wrong happened to hardware, abort the rpc */ > - if (timeout == 0) { > - rpc_abort(ep, 0, PCS_ERR_NET_ABORT); > - break; > - } > - mutex_unlock(&ep->mutex); > - > - /* sleep 10 us */ > - udelay(10); > - timeout--; > - > - /* check again */ > - mutex_lock(&ep->mutex); > - } > - > - mutex_unlock(&ep->mutex); > - > return 0; > } > > @@ -1117,6 +1096,7 @@ int pcs_krpc_connect(struct pcs_krpc_set *krpcs, > PCS_NODE_ID_T *id) > msg = &connect_req->msg; > msg->size = 0; > msg->timeout = 0; > + msg->flags = 0; > msg->rpc = NULL; > msg->done = krpc_connect_done; > pcs_clear_error(&msg->error); > diff --git a/fs/fuse/kio/pcs/pcs_rdma_io.c b/fs/fuse/kio/pcs/pcs_rdma_io.c > index d50f2c1e97e3..1d9e648d2636 100644 > --- a/fs/fuse/kio/pcs/pcs_rdma_io.c > +++ b/fs/fuse/kio/pcs/pcs_rdma_io.c > @@ -154,6 +154,7 @@ static struct pcs_msg *rio_dequeue_reserved_msg(struct > pcs_rdmaio *rio) > static void rio_msg_sent(struct pcs_rdmaio *rio, struct rio_tx *tx, struct > pcs_msg *msg, int done) > { > if (done) { > + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); > pcs_msg_sent(msg); > msg->done(msg); > } else { > @@ -621,6 +622,19 @@ static int rio_submit(struct pcs_rdmaio *rio, struct > pcs_msg *msg, int type, u64 > int offset = 0; > struct iov_iter it; > > + if (msg) { > + set_bit(PCS_MSG_BUSY, &msg->flags); > + > + /* pair with the smp_mb() between set PCS_MSG_ABORTED and > test PCS_MSG_BUSY */ > + smp_mb(); > + > + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags))) { > + pcs_set_local_error(&msg->error, > PCS_ERR_CANCEL_REQUEST); > + rio_msg_sent(rio, tx, msg, 1); > + return 0; > + } > + } > + > tx = RE_NULL(rio_get_tx(dev)); > if (!tx) { > if (allow_again) > diff --git a/fs/fuse/kio/pcs/pcs_rpc.c b/fs/fuse/kio/pcs/pcs_rpc.c > index 71c2a3b54da7..f15d0c3fb7cd 100644 > --- a/fs/fuse/kio/pcs/pcs_rpc.c > +++ b/fs/fuse/kio/pcs/pcs_rpc.c > @@ -1374,6 +1374,7 @@ void pcs_rpc_init_input_msg(struct pcs_rpc * ep, struct > pcs_msg * msg, int accou > INIT_HLIST_NODE(&msg->kill_link); > pcs_rpc_account_msg(ep, msg, account); > msg->destructor = pcs_rpc_input_destructor; > + msg->flags = 0; > } > > struct pcs_msg * pcs_rpc_alloc_input_msg(struct pcs_rpc * ep, int datalen) > @@ -1408,6 +1409,7 @@ void pcs_rpc_init_output_msg(struct pcs_msg * msg) > msg->rpc = NULL; > INIT_HLIST_NODE(&msg->kill_link); > msg->destructor = pcs_msg_output_destructor; > + msg->flags = 0; > } > > struct pcs_msg * pcs_rpc_alloc_output_msg(int datalen) > diff --git a/fs/fuse/kio/pcs/pcs_sock_io.c b/fs/fuse/kio/pcs/pcs_sock_io.c > index 805b8f1e56b0..7eb231b7260d 100644 > --- a/fs/fuse/kio/pcs/pcs_sock_io.c > +++ b/fs/fuse/kio/pcs/pcs_sock_io.c > @@ -35,6 +35,23 @@ void pcs_msg_sent(struct pcs_msg * msg) > } > } > > +int pcs_msg_abort(struct pcs_msg *msg, bool wait) > +{ > + set_bit(PCS_MSG_ABORTED, &msg->flags); > + > + /* pair with the smp_mb() between set PCS_MSG_BUSY and test > PCS_MSG_ABORTED */ > + smp_mb(); > + > + while (test_bit(PCS_MSG_BUSY, &msg->flags)) { > + if (wait) > + wait_on_bit(&msg->flags, PCS_MSG_BUSY, > TASK_INTERRUPTIBLE); > + else > + return -EBUSY; > + } > + > + return 0; > +} > + > static void sio_push(struct pcs_sockio * sio) > { > TRACE(PEER_FMT" flush \n", PEER_ARGS(sio->netio.parent)); > @@ -376,6 +393,17 @@ static void pcs_sockio_send(struct pcs_sockio *sio) > return; > } > > + set_bit(PCS_MSG_BUSY, &msg->flags); > + > + /* pair with the smp_mb() between set PCS_MSG_ABORTED and > test PCS_MSG_BUSY */ > + smp_mb(); > + > + /* Shouldn't abort a half sent message */ > + if (unlikely(test_bit(PCS_MSG_ABORTED, &msg->flags)) && > !sio->write_offset) { > + pcs_set_local_error(&msg->error, > PCS_ERR_CANCEL_REQUEST); > + goto skip_send; > + } > + > /* TODO: cond resched here? */ > while (sio->write_offset < msg->size) { > size_t left = msg->size - sio->write_offset; > @@ -409,6 +437,10 @@ static void pcs_sockio_send(struct pcs_sockio *sio) > return; > } > } > + > +skip_send: > + clear_and_wake_up_bit(PCS_MSG_BUSY, &msg->flags); > + > list_del_init(&msg->list); > sio->write_queue_len -= msg->size; > > @@ -634,7 +666,6 @@ struct pcs_msg * pcs_alloc_input_msg(struct pcs_sockio * > sio, int datalen) > > msg = kmalloc(sizeof(struct pcs_msg) + datalen, GFP_NOIO); > if (msg) { > - > pcs_msg_io_init(msg); > pcs_account_msg(sio, msg); > msg->destructor = pcs_msg_input_destructor; > @@ -713,6 +744,7 @@ struct pcs_msg * pcs_clone_msg(struct pcs_msg * msg) > clone->destructor = pcs_io_msg_output_destructor; > clone->private = msg; > clone->get_iter = get_iter_clone; > + clone->flags = 0; > } > return clone; > } > @@ -752,6 +784,7 @@ struct pcs_msg * pcs_cow_msg(struct pcs_msg * msg, int > copy_len) > clone->_inline_len = (short)copy_len; > memcpy(clone->_inline_buffer, msg_inline_head(msg), copy_len); > clone->get_iter = get_iter_cow_clone; > + clone->flags = 0; > } > return clone; > } > diff --git a/fs/fuse/kio/pcs/pcs_sock_io.h b/fs/fuse/kio/pcs/pcs_sock_io.h > index 09870b38cdad..b14e992a22d3 100644 > --- a/fs/fuse/kio/pcs/pcs_sock_io.h > +++ b/fs/fuse/kio/pcs/pcs_sock_io.h > @@ -35,6 +35,17 @@ struct pcs_api_channel > unsigned msg_count; > }; > > +/** > + * pcs_msg flags > + * > + * PCS_MSG_BUSY: set when the msg is under IO > + * PCS_MSG_BORTED: the msg was aborted. > + */ > +enum pcs_msg_flag { > + PCS_MSG_BUSY, > + PCS_MSG_ABORTED, > +}; > + > __pre_packed struct pcs_msg > { > struct __pre_aligned(16) { > @@ -58,6 +69,8 @@ __pre_packed struct pcs_msg > unsigned char stage; > abs_time_t io_start_time; > > + unsigned long flags; > + > struct hlist_node kill_link; > > void (*get_iter)(struct pcs_msg *, int offset, > struct iov_iter *it, > @@ -181,6 +194,7 @@ static inline void iov_iter_get_kvec(struct iov_iter *i, > struct kvec *vec) > } > void pcs_sock_ioconn_destruct(struct pcs_ioconn *ioconn); > void pcs_msg_sent(struct pcs_msg * msg); > +int pcs_msg_abort(struct pcs_msg *msg, bool wait); > > static inline void * msg_inline_head(struct pcs_msg * msg) > { > -- > 2.39.5 (Apple Git-154)
_______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel