Traversing the list of requests to be aborted needs to be done under the krpc->lock, otherwise other threads could delete requests from the list, resulting in illegal memory access.
However in the unlikely event when the aborting msg is under I/O, krpc abort must wait until it finish. Apparently wait and holding the krpc->lock is dangerous and unacceptable, however wait on the msg without krpc->lock is also dangerous as the msg could have been freed by the time we wake up. So in that case we simply wait retry again. However if the msg is stuck at netio for a long period of time we need to kill the connection. The timeout can be set via module parameter 'pcs_krpc_abort_timeout'. Fixes: VSTOR-106209 https://virtuozzo.atlassian.net/browse/VSTOR-106209 Signed-off-by: Liu Kui <kui....@virtuozzo.com> fix debug --- fs/fuse/kio/pcs/pcs_krpc.c | 44 ++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/fs/fuse/kio/pcs/pcs_krpc.c b/fs/fuse/kio/pcs/pcs_krpc.c index 3e404a2d0bde..9cc4e12b5eab 100644 --- a/fs/fuse/kio/pcs/pcs_krpc.c +++ b/fs/fuse/kio/pcs/pcs_krpc.c @@ -29,6 +29,10 @@ unsigned int pcs_krpc_use_thread = 1; module_param(pcs_krpc_use_thread, uint, 0644); MODULE_PARM_DESC(pcs_krpc_use_thread, "Offload creating the request to a thread"); +unsigned int pcs_krpc_abort_timeout = 10; +module_param(pcs_krpc_abort_timeout, uint, 0644); +MODULE_PARM_DESC(pcs_krpc_abort_timeout, "Timeout for krpc abort in milisecond"); + extern unsigned int pcs_krpc_version; struct kmem_cache *krpc_req_cachep; @@ -726,9 +730,10 @@ static int pcs_krpc_ioctl_send_msg(struct krpc_req *kreq) static int pcs_krpc_abort(struct pcs_krpc *krpc) { - struct krpc_req *kreq, *tmp; + struct krpc_req *kreq; struct krpc_completion *comp; struct pcs_msg *msg; + int retries = pcs_krpc_abort_timeout * 10; spin_lock(&krpc->lock); @@ -755,20 +760,47 @@ static int pcs_krpc_abort(struct pcs_krpc *krpc) } krpc->nr_completion = 0; - /* abort incompleted requests */ + /* + * Move incompleted requests to a separate queue and abort every one + * of them + */ list_splice_tail_init(&krpc->pending_queue, &krpc->dispose_queue); - spin_unlock(&krpc->lock); - list_for_each_entry_safe(kreq, tmp, &krpc->dispose_queue, link) { +retry: + list_for_each_entry(kreq, &krpc->dispose_queue, link) { + if (kreq->flags & KRPC_REQ_F_ABORTED) + continue; + kreq->flags |= KRPC_REQ_F_ABORTED; msg = &kreq->msg; /* * The msg isn't freed immediately here however the user buffer - * won't be accessed. + * won't be accessed. If in an unlikely event the msg is currently + * under I/O, we must wait until I/O completes. */ - pcs_msg_abort(msg, true); + if (pcs_msg_abort(msg, false) == -EBUSY) { + spin_unlock(&krpc->lock); + + if (--retries < 0) { + /* seems the msg is stuck at netio, kill the connection */ + mutex_lock(&krpc->rpc->mutex); + rpc_abort(krpc->rpc, 0, PCS_ERR_NET_ABORT); + mutex_unlock(&krpc->rpc->mutex); + return 0; + } + usleep_range(100, 200); + + /* + * The msg we're waiting for may have been completed and freed, + * so we must go through the list again under krpc->lock. + */ + spin_lock(&krpc->lock); + goto retry; + } } + spin_unlock(&krpc->lock); + return 0; } -- 2.39.5 (Apple Git-154) _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel