The patch implements: AK's ideas about enhancing mt-fuse are about handling incoming requests in kernel. Currently, mainline puts all of them in a single (global) queue, letting each userspace thread to pick up an arbitrary "first" request from the queue. AK suggests to introduce CPU_core-to-FUSE_device mapping in kernel. Then, if a request is originated on CPU_core==i, let's put it to a queue bound to FUSE_device[i] and wakeup userspace waiting for events on this device.
Also, AK suggests to enable this technique only for FUSE_READ-s, putting all other requests to some "main" FUSE_device queue. In turn, the userspace is supposed to epoll() all FUSE_devices in some "main" thread, passing the task of replying to a FUSE_READ request to some proper "auxiliary" thread (the thread will be chosen based on the epoll() results). Signed-off-by: Maxim Patlasov <[email protected]> --- fs/fuse/control.c | 17 +++++++++- fs/fuse/cuse.c | 6 +++- fs/fuse/dev.c | 88 +++++++++++++++++++++++++++++++---------------------- fs/fuse/file.c | 14 +++++--- fs/fuse/fuse_i.h | 22 ++++++++++--- fs/fuse/inode.c | 30 ++++++++++++++---- 6 files changed, 122 insertions(+), 55 deletions(-) diff --git a/fs/fuse/control.c b/fs/fuse/control.c index c35a69b..1461e58 100644 --- a/fs/fuse/control.c +++ b/fs/fuse/control.c @@ -281,7 +281,7 @@ static int fuse_conn_seq_open(struct file *filp, int list_id) fcp->conn = conn; switch (list_id) { case FUSE_PENDING_REQ: - fcp->req_list = &conn->iq.pending; + fcp->req_list = &conn->main_iq.pending; break; #if 0 case FUSE_PROCESSING_REQ: @@ -400,10 +400,23 @@ static const struct file_operations fuse_conn_files_ops = { static int fuse_conn_show(struct seq_file *sf, void *v) { struct fuse_conn *fc = sf->private; + struct fuse_dev *fud; + int n_total = 0; + int n_active = 0; + + spin_lock(&fc->lock); + list_for_each_entry(fud, &fc->devices, entry) { + struct fuse_iqueue *fiq = fud->fiq; + if (waitqueue_active(&fiq->waitq)) + n_active++; + n_total++; + } + spin_unlock(&fc->lock); + seq_printf(sf, "Connected: %d\n", fc->connected); seq_printf(sf, "Initialized: %d\n", fc->initialized); seq_printf(sf, "Blocked: %d\n", fc->blocked); - seq_printf(sf, "WQ active: %d\n", waitqueue_active(&fc->iq.waitq)); + seq_printf(sf, "WQ active: %d of %d\n", n_active, n_total); seq_printf(sf, "Blocked_wq active: %d\n", waitqueue_active(&fc->blocked_waitq)); seq_printf(sf, "num_background: %d\n", fc->num_background); seq_printf(sf, "num_waiting: %d\n", atomic_read(&fc->num_waiting)); diff --git a/fs/fuse/cuse.c b/fs/fuse/cuse.c index 9935d02..b705461 100644 --- a/fs/fuse/cuse.c +++ b/fs/fuse/cuse.c @@ -503,7 +503,11 @@ static int cuse_channel_open(struct inode *inode, struct file *file) if (!cc) return -ENOMEM; - fuse_conn_init(&cc->fc); + rc = fuse_conn_init(&cc->fc); + if (rc) { + kfree(cc); + return rc; + } fud = fuse_dev_alloc(&cc->fc); if (!fud) { diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c index 44cf447..08c825e 100644 --- a/fs/fuse/dev.c +++ b/fs/fuse/dev.c @@ -36,7 +36,8 @@ static struct fuse_dev *fuse_get_dev(struct file *file) return ACCESS_ONCE(file->private_data); } -static void fuse_request_init(struct fuse_req *req, struct page **pages, +static void fuse_request_init(struct fuse_conn *fc, + struct fuse_req *req, struct page **pages, struct fuse_page_desc *page_descs, unsigned npages) { @@ -50,10 +51,12 @@ static void fuse_request_init(struct fuse_req *req, struct page **pages, req->pages = pages; req->page_descs = page_descs; req->max_pages = npages; + req->fiq = &fc->main_iq; __set_bit(FR_PENDING, &req->flags); } -static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags) +static struct fuse_req *__fuse_request_alloc(struct fuse_conn *fc, + unsigned npages, gfp_t flags) { struct fuse_req *req = kmem_cache_alloc(fuse_req_cachep, flags); if (req) { @@ -76,20 +79,20 @@ static struct fuse_req *__fuse_request_alloc(unsigned npages, gfp_t flags) return NULL; } - fuse_request_init(req, pages, page_descs, npages); + fuse_request_init(fc, req, pages, page_descs, npages); } return req; } -struct fuse_req *fuse_request_alloc(unsigned npages) +struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages) { - return __fuse_request_alloc(npages, GFP_KERNEL); + return __fuse_request_alloc(fc, npages, GFP_KERNEL); } EXPORT_SYMBOL_GPL(fuse_request_alloc); -struct fuse_req *fuse_request_alloc_nofs(unsigned npages) +struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc, unsigned npages) { - return __fuse_request_alloc(npages, GFP_NOFS); + return __fuse_request_alloc(fc, npages, GFP_NOFS); } void fuse_request_free(struct fuse_req *req) @@ -156,7 +159,7 @@ static struct fuse_req *__fuse_get_req(struct fuse_conn *fc, unsigned npages, if (fc->conn_error) goto out; - req = fuse_request_alloc(npages); + req = fuse_request_alloc(fc, npages); err = -ENOMEM; if (!req) { if (for_background) @@ -223,7 +226,7 @@ static void put_reserved_req(struct fuse_conn *fc, struct fuse_req *req) struct fuse_file *ff = file->private_data; spin_lock(&fc->lock); - fuse_request_init(req, req->pages, req->page_descs, req->max_pages); + fuse_request_init(fc, req, req->pages, req->page_descs, req->max_pages); BUG_ON(ff->reserved_req); ff->reserved_req = req; wake_up_all(&fc->reserved_req_waitq); @@ -253,7 +256,7 @@ struct fuse_req *fuse_get_req_nofail_nopages(struct fuse_conn *fc, wait_event(fc->blocked_waitq, fc->initialized); /* Matches smp_wmb() in fuse_set_initialized() */ smp_rmb(); - req = fuse_request_alloc(0); + req = fuse_request_alloc(fc, 0); if (!req) req = get_reserved_req(fc, file); @@ -318,7 +321,7 @@ static void queue_request(struct fuse_iqueue *fiq, struct fuse_req *req) void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget, u64 nodeid, u64 nlookup) { - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = &fc->main_iq; forget->forget_one.nodeid = nodeid; forget->forget_one.nlookup = nlookup; @@ -335,12 +338,11 @@ void fuse_queue_forget(struct fuse_conn *fc, struct fuse_forget_link *forget, spin_unlock(&fiq->waitq.lock); } -static void flush_bg_queue(struct fuse_conn *fc) +static void flush_bg_queue(struct fuse_conn *fc, struct fuse_iqueue *fiq) { while (fc->active_background < fc->max_background && !list_empty(&fc->bg_queue)) { struct fuse_req *req; - struct fuse_iqueue *fiq = &fc->iq; req = list_entry(fc->bg_queue.next, struct fuse_req, list); list_del(&req->list); @@ -362,7 +364,7 @@ static void flush_bg_queue(struct fuse_conn *fc) */ static void request_end(struct fuse_conn *fc, struct fuse_req *req) { - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = req->fiq; if (test_and_set_bit(FR_FINISHED, &req->flags)) return; @@ -389,7 +391,7 @@ static void request_end(struct fuse_conn *fc, struct fuse_req *req) } fc->num_background--; fc->active_background--; - flush_bg_queue(fc); + flush_bg_queue(fc, fiq); spin_unlock(&fc->lock); } wake_up(&req->waitq); @@ -415,7 +417,7 @@ static void queue_interrupt(struct fuse_iqueue *fiq, struct fuse_req *req) static void request_wait_answer(struct fuse_conn *fc, struct fuse_req *req) { - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = req->fiq; int err; if (!fc->no_interrupt) { @@ -462,7 +464,7 @@ static void request_wait_answer(struct fuse_conn *fc, struct fuse_req *req) static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req, struct fuse_file *ff) { - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = req->fiq; BUG_ON(test_bit(FR_BACKGROUND, &req->flags)); spin_lock(&fiq->waitq.lock); @@ -511,6 +513,8 @@ EXPORT_SYMBOL_GPL(fuse_request_send); void fuse_request_send_background_locked(struct fuse_conn *fc, struct fuse_req *req) { + struct fuse_iqueue *fiq = req->fiq; + BUG_ON(!test_bit(FR_BACKGROUND, &req->flags)); if (!test_bit(FR_WAITING, &req->flags)) { __set_bit(FR_WAITING, &req->flags); @@ -526,7 +530,7 @@ void fuse_request_send_background_locked(struct fuse_conn *fc, set_bdi_congested(&fc->bdi, BLK_RW_ASYNC); } list_add_tail(&req->list, &fc->bg_queue); - flush_bg_queue(fc); + flush_bg_queue(fc, fiq); } void fuse_request_send_background(struct fuse_conn *fc, struct fuse_req *req) @@ -557,7 +561,7 @@ static int fuse_request_send_notify_reply(struct fuse_conn *fc, struct fuse_req *req, u64 unique) { int err = -ENODEV; - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = req->fiq; __clear_bit(FR_ISREPLY, &req->flags); req->in.h.unique = unique; @@ -1207,7 +1211,7 @@ static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file, { ssize_t err; struct fuse_conn *fc = fud->fc; - struct fuse_iqueue *fiq = &fc->iq; + struct fuse_iqueue *fiq = fud->fiq; struct fuse_pqueue *fpq = &fud->pq; struct fuse_req *req; struct fuse_in *in; @@ -1918,7 +1922,7 @@ static ssize_t fuse_dev_do_write(struct fuse_dev *fud, if (oh.error == -ENOSYS) fc->no_interrupt = 1; else if (oh.error == -EAGAIN) - queue_interrupt(&fc->iq, req); + queue_interrupt(req->fiq, req); fuse_copy_finish(cs); return nbytes; @@ -2058,7 +2062,7 @@ static unsigned fuse_dev_poll(struct file *file, poll_table *wait) if (!fud) return POLLERR; - fiq = &fud->fc->iq; + fiq = fud->fiq; poll_wait(file, &fiq->waitq, wait); spin_lock(&fiq->waitq.lock); @@ -2103,6 +2107,22 @@ static void end_polls(struct fuse_conn *fc) } } +void fuse_abort_iqueue(struct fuse_iqueue *fiq, struct list_head *to_end) +{ + struct fuse_req *req; + + spin_lock(&fiq->waitq.lock); + fiq->connected = 0; + list_splice_init(&fiq->pending, to_end); + list_for_each_entry(req, to_end, list) + clear_bit(FR_PENDING, &req->flags); + while (forget_pending(fiq)) + kfree(dequeue_forget(fiq, 1, NULL)); + wake_up_all_locked(&fiq->waitq); + spin_unlock(&fiq->waitq.lock); + kill_fasync(&fiq->fasync, SIGIO, POLL_IN); +} + /* * Abort all requests. * @@ -2123,7 +2143,7 @@ static void end_polls(struct fuse_conn *fc) */ void fuse_abort_conn(struct fuse_conn *fc) { - struct fuse_iqueue *fiq = &fc->iq; + int cpu; spin_lock(&fc->lock); if (fc->connected) { @@ -2154,18 +2174,14 @@ void fuse_abort_conn(struct fuse_conn *fc) spin_unlock(&fpq->lock); } fc->max_background = UINT_MAX; - flush_bg_queue(fc); + for_each_online_cpu(cpu) + flush_bg_queue(fc, per_cpu_ptr(fc->iqs, cpu)); + flush_bg_queue(fc, &fc->main_iq); + + for_each_online_cpu(cpu) + fuse_abort_iqueue(per_cpu_ptr(fc->iqs, cpu), &to_end2); + fuse_abort_iqueue(&fc->main_iq, &to_end2); - spin_lock(&fiq->waitq.lock); - fiq->connected = 0; - list_splice_init(&fiq->pending, &to_end2); - list_for_each_entry(req, &to_end2, list) - clear_bit(FR_PENDING, &req->flags); - while (forget_pending(fiq)) - kfree(dequeue_forget(fiq, 1, NULL)); - wake_up_all_locked(&fiq->waitq); - spin_unlock(&fiq->waitq.lock); - kill_fasync(&fiq->fasync, SIGIO, POLL_IN); end_polls(fc); wake_up_all(&fc->blocked_waitq); spin_unlock(&fc->lock); @@ -2195,7 +2211,7 @@ int fuse_dev_release(struct inode *inode, struct file *file) end_requests(fc, &fpq->processing); /* Are we the last open device? */ if (atomic_dec_and_test(&fc->dev_count)) { - WARN_ON(fc->iq.fasync != NULL); + WARN_ON(fud->fiq->fasync != NULL); fuse_abort_conn(fc); } fuse_dev_free(fud); @@ -2212,7 +2228,7 @@ static int fuse_dev_fasync(int fd, struct file *file, int on) return -EPERM; /* No locking - fasync_helper does its own locking */ - return fasync_helper(fd, file, on, &fud->fc->iq.fasync); + return fasync_helper(fd, file, on, &fud->fiq->fasync); } static int fuse_device_clone(struct fuse_conn *fc, struct file *new) diff --git a/fs/fuse/file.c b/fs/fuse/file.c index 3a5388e..f00d05b 100644 --- a/fs/fuse/file.c +++ b/fs/fuse/file.c @@ -80,7 +80,7 @@ struct fuse_file *fuse_file_alloc(struct fuse_conn *fc) ff->ff_state = 0; ff->fc = fc; - ff->reserved_req = fuse_request_alloc(0); + ff->reserved_req = fuse_request_alloc(fc, 0); if (unlikely(!ff->reserved_req)) { kfree(ff); return NULL; @@ -770,8 +770,12 @@ void fuse_read_fill(struct fuse_req *req, struct file *file, loff_t pos, req->out.numargs = 1; req->out.args[0].size = count; - if (opcode == FUSE_READ) + if (opcode == FUSE_READ) { + struct fuse_iqueue *fiq = __this_cpu_ptr(ff->fc->iqs); + if (fiq->handled_by_fud) + req->fiq = fiq; req->inode = file->f_dentry->d_inode; + } } static void fuse_release_user_pages(struct fuse_req *req, int write) @@ -2003,7 +2007,7 @@ static int fuse_writepage_locked(struct page *page, if (test_set_page_writeback(page)) BUG(); - req = fuse_request_alloc_nofs(1); + req = fuse_request_alloc_nofs(fc, 1); if (!req) goto err; @@ -2247,7 +2251,7 @@ static int fuse_writepages_fill(struct page *page, } data->req = req = - fuse_request_alloc_nofs(FUSE_MAX_PAGES_PER_REQ); + fuse_request_alloc_nofs(fc, FUSE_MAX_PAGES_PER_REQ); if (!req) { unlock_page(page); return -ENOMEM; @@ -2321,7 +2325,7 @@ static int fuse_writepages(struct address_space *mapping, fuse_release_ff(inode, ff); data.inode = inode; - data.req = fuse_request_alloc_nofs(FUSE_MAX_PAGES_PER_REQ); + data.req = fuse_request_alloc_nofs(fc, FUSE_MAX_PAGES_PER_REQ); err = -ENOMEM; if (!data.req) goto out; diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index 77230c3..6a82ced 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -407,12 +407,18 @@ struct fuse_req { /** Request is stolen from fuse_file->reserved_req */ struct file *stolen_file; + + /** Request will be handled by fud pointing to this fiq */ + struct fuse_iqueue *fiq; }; struct fuse_iqueue { /** Connection established */ unsigned connected; + /** # of fuds pointing to this fiq */ + int handled_by_fud; + /** Readers of the connection are waiting on this */ wait_queue_head_t waitq; @@ -457,6 +463,9 @@ struct fuse_dev { /** Fuse connection for this device */ struct fuse_conn *fc; + /** Input queue */ + struct fuse_iqueue *fiq; + /** Processing queue */ struct fuse_pqueue pq; @@ -499,8 +508,11 @@ struct fuse_conn { /** Maximum write size */ unsigned max_write; - /** Input queue */ - struct fuse_iqueue iq; + /** Main input queue */ + struct fuse_iqueue main_iq; + + /** Per-cpu input queues */ + struct fuse_iqueue __percpu *iqs; /** The next unique kernel file handle */ u64 khctr; @@ -818,9 +830,9 @@ void fuse_ctl_cleanup(void); /** * Allocate a request */ -struct fuse_req *fuse_request_alloc(unsigned npages); +struct fuse_req *fuse_request_alloc(struct fuse_conn *fc, unsigned npages); -struct fuse_req *fuse_request_alloc_nofs(unsigned npages); +struct fuse_req *fuse_request_alloc_nofs(struct fuse_conn *fc, unsigned npages); /** * Free a request @@ -898,7 +910,7 @@ struct fuse_conn *fuse_conn_get(struct fuse_conn *fc); /** * Initialize fuse_conn */ -void fuse_conn_init(struct fuse_conn *fc); +int fuse_conn_init(struct fuse_conn *fc); /** * Release reference to fuse_conn diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c index 3ac204e..705c98b 100644 --- a/fs/fuse/inode.c +++ b/fs/fuse/inode.c @@ -424,12 +424,14 @@ int fuse_invalidate_files(struct fuse_conn *fc, u64 nodeid) spin_lock(&fc->lock); list_for_each_entry(fud, &fc->devices, entry) { struct fuse_pqueue *fpq = &fud->pq; + struct fuse_iqueue *fiq = fud->fiq; spin_lock(&fpq->lock); fuse_kill_requests(fc, inode, &fpq->processing); + fuse_kill_requests(fc, inode, &fiq->pending); fuse_kill_requests(fc, inode, &fpq->io); spin_unlock(&fpq->lock); } - fuse_kill_requests(fc, inode, &fc->iq.pending); + fuse_kill_requests(fc, inode, &fc->main_iq.pending); fuse_kill_requests(fc, inode, &fc->bg_queue); wake_up(&fi->page_waitq); /* readpage[s] can wait on fuse wb */ spin_unlock(&fc->lock); @@ -712,8 +714,9 @@ static void fuse_pqueue_init(struct fuse_pqueue *fpq) fpq->connected = 1; } -void fuse_conn_init(struct fuse_conn *fc) +int fuse_conn_init(struct fuse_conn *fc) { + int cpu; memset(fc, 0, sizeof(*fc)); spin_lock_init(&fc->lock); mutex_init(&fc->inst_mutex); @@ -722,7 +725,12 @@ void fuse_conn_init(struct fuse_conn *fc) atomic_set(&fc->dev_count, 1); init_waitqueue_head(&fc->blocked_waitq); init_waitqueue_head(&fc->reserved_req_waitq); - fuse_iqueue_init(&fc->iq); + fuse_iqueue_init(&fc->main_iq); + fc->iqs = alloc_percpu(struct fuse_iqueue); + if (!fc->iqs) + return -ENOMEM; + for_each_online_cpu(cpu) + fuse_iqueue_init(per_cpu_ptr(fc->iqs, cpu)); INIT_LIST_HEAD(&fc->bg_queue); INIT_LIST_HEAD(&fc->entry); INIT_LIST_HEAD(&fc->conn_files); @@ -737,6 +745,7 @@ void fuse_conn_init(struct fuse_conn *fc) fc->connected = 1; fc->attr_version = 1; get_random_bytes(&fc->scramble_key, sizeof(fc->scramble_key)); + return 0; } EXPORT_SYMBOL_GPL(fuse_conn_init); @@ -1070,6 +1079,7 @@ static void fuse_send_init(struct fuse_conn *fc, struct fuse_req *req) static void fuse_free_conn(struct fuse_conn *fc) { WARN_ON(!list_empty(&fc->devices)); + free_percpu(fc->iqs); kfree(fc); } @@ -1128,9 +1138,11 @@ struct fuse_dev *fuse_dev_alloc(struct fuse_conn *fc) fud = kzalloc(sizeof(struct fuse_dev), GFP_KERNEL); if (fud) { fud->fc = fuse_conn_get(fc); + fud->fiq = &fc->main_iq; fuse_pqueue_init(&fud->pq); spin_lock(&fc->lock); + fud->fiq->handled_by_fud++; list_add_tail(&fud->entry, &fc->devices); spin_unlock(&fc->lock); } @@ -1145,6 +1157,8 @@ void fuse_dev_free(struct fuse_dev *fud) if (fc) { spin_lock(&fc->lock); + fud->fiq->handled_by_fud--; + BUG_ON(fud->fiq->handled_by_fud < 0); list_del(&fud->entry); spin_unlock(&fc->lock); @@ -1205,7 +1219,11 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent) if (!fc) goto err_fput; - fuse_conn_init(fc); + err = fuse_conn_init(fc); + if (err) { + kfree(fc); + goto err_fput; + } fc->release = fuse_free_conn; fud = fuse_dev_alloc(fc); @@ -1242,13 +1260,13 @@ static int fuse_fill_super(struct super_block *sb, void *data, int silent) /* only now - we want root dentry with NULL ->d_op */ sb->s_d_op = &fuse_dentry_operations; - init_req = fuse_request_alloc(0); + init_req = fuse_request_alloc(fc, 0); if (!init_req) goto err_put_root; __set_bit(FR_BACKGROUND, &init_req->flags); if (is_bdev || (fc->flags & FUSE_UMOUNT_WAIT)) { - fc->destroy_req = fuse_request_alloc(0); + fc->destroy_req = fuse_request_alloc(fc, 0); if (!fc->destroy_req) goto err_free_init_req; } _______________________________________________ Devel mailing list [email protected] https://lists.openvz.org/mailman/listinfo/devel
