The commit is pushed to "branch-rh7-3.10.0-693.17.1.vz7.45.x-ovz" and will 
appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-693.17.1.vz7.43.7
------>
commit 8bfe46448fb9b35444e8cc7963b472459039758d
Author: Dmitry Monakhov <dmonak...@openvz.org>
Date:   Mon Feb 19 14:22:34 2018 +0300

    fuse_kio_pcs: implement truncate
    
    Idea of truncate implementation is fairly simple
    grow: (similar to userspace)
     1) all writes beyond i_size are queued to inode's grow_queue,
     2) update size via userspace
     3) resubmit pended writes
    
    shrink:
     1) stop all IO to inode
        a) writes are blocked by i_mutex
        b) reads are queued to shrink_queue
     2) update size via userspace
     3) resubmit pended reads
    
    TODO: May be it is simpler force read requests to grab
          i_mutex if shrink is in progress to avoid queuing
    
    https://jira.sw.ru/browse/PSBM-80680
    Signed-off-by: Dmitry Monakhov <dmonak...@openvz.org>
---
 fs/fuse/dir.c                      |   2 +
 fs/fuse/kio/pcs/fuse_io.c          |   2 +
 fs/fuse/kio/pcs/pcs_client_types.h |   8 +
 fs/fuse/kio/pcs/pcs_cluster.h      |   7 +
 fs/fuse/kio/pcs/pcs_fuse_kdirect.c | 318 +++++++++++++++++++++++++++++++++----
 fs/fuse/kio/pcs/pcs_map.h          |   5 +
 6 files changed, 315 insertions(+), 27 deletions(-)

diff --git a/fs/fuse/dir.c b/fs/fuse/dir.c
index d4427c56012b..0ae0344be3c5 100644
--- a/fs/fuse/dir.c
+++ b/fs/fuse/dir.c
@@ -977,6 +977,7 @@ static int fuse_do_getattr(struct inode *inode, struct 
kstat *stat,
        req->in.args[0].size = sizeof(inarg);
        req->in.args[0].value = &inarg;
        req->out.numargs = 1;
+       req->io_inode = inode;
        if (fc->minor < 9)
                req->out.args[0].size = FUSE_COMPAT_ATTR_OUT_SIZE;
        else
@@ -1639,6 +1640,7 @@ void fuse_set_nowrite(struct inode *inode)
        BUG_ON(fi->writectr < 0);
        fi->writectr += FUSE_NOWRITE;
        spin_unlock(&fc->lock);
+       inode_dio_wait(inode);
        wait_event(fi->page_waitq, fi->writectr == FUSE_NOWRITE);
 }
 
diff --git a/fs/fuse/kio/pcs/fuse_io.c b/fs/fuse/kio/pcs/fuse_io.c
index c9eaa8d453db..5884fe25a20f 100644
--- a/fs/fuse/kio/pcs/fuse_io.c
+++ b/fs/fuse/kio/pcs/fuse_io.c
@@ -37,6 +37,7 @@ static void on_read_done(struct pcs_fuse_req *r, size_t size)
 
        DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, 
r->req.in.h.opcode, r->req.out.h.error);
        r->req.out.args[0].size = size;
+       inode_dio_end(r->req.io_inode);
        request_end(pfc->fc, &r->req);
 }
 
@@ -56,6 +57,7 @@ static void on_write_done(struct pcs_fuse_req *r, off_t pos, 
size_t size)
        out->size = size;
 
        DTRACE("do fuse_request_end req:%p op:%d err:%d\n", &r->req, 
r->req.in.h.opcode, r->req.out.h.error);
+       inode_dio_end(r->req.io_inode);
        request_end(pfc->fc, &r->req);
 }
 
diff --git a/fs/fuse/kio/pcs/pcs_client_types.h 
b/fs/fuse/kio/pcs/pcs_client_types.h
index 3bffd4992221..0be1caff7b46 100644
--- a/fs/fuse/kio/pcs/pcs_client_types.h
+++ b/fs/fuse/kio/pcs/pcs_client_types.h
@@ -53,6 +53,14 @@ struct pcs_dentry_info {
        PCS_FILETIME_T          local_mtime;
        struct pcs_mapping      mapping;
        struct pcs_cluster_core *cluster;
+       spinlock_t              lock;
+       struct {
+               struct work_struct      work;
+               unsigned long long      shrink;
+               unsigned long long      required;
+               struct list_head        grow_queue;
+               struct list_head        shrink_queue;
+       } size;
        struct fuse_inode       *inode;
 };
 
diff --git a/fs/fuse/kio/pcs/pcs_cluster.h b/fs/fuse/kio/pcs/pcs_cluster.h
index 3a8116b705df..8b58d3cd946b 100644
--- a/fs/fuse/kio/pcs/pcs_cluster.h
+++ b/fs/fuse/kio/pcs/pcs_cluster.h
@@ -8,6 +8,12 @@ struct fuse_conn;
 /* Try to follows pcs/client/fused structure style */
 struct pcs_fuse_exec_ctx {
        struct pcs_int_request  ireq;
+       /* The file size control block */
+       struct {
+               unsigned long long      required;
+               unsigned char           granted;
+               unsigned char           waiting;
+       } size;
        struct {
                pcs_api_iorequest_t     req;
                struct bio_vec          *bvec;
@@ -24,6 +30,7 @@ struct pcs_fuse_exec_ctx {
 
 struct pcs_fuse_req {
        struct fuse_req req;
+       void (*end)(struct fuse_conn *, struct fuse_req *);
        struct pcs_fuse_exec_ctx exec;  /* Zero initialized context */
 };
 
diff --git a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c 
b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
index 1b21fc9f9ffe..29d62faa8612 100644
--- a/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
+++ b/fs/fuse/kio/pcs/pcs_fuse_kdirect.c
@@ -236,6 +236,7 @@ static int fuse_pcs_kdirect_claim_op(struct fuse_conn *fc, 
struct file *file,
        fuse_put_request(fc, req);
        return err;
 }
+static void  fuse_size_grow_work(struct work_struct *w);
 
 static int kpcs_do_file_open(struct fuse_conn *fc, struct file *file, struct 
inode *inode)
 {
@@ -263,6 +264,11 @@ static int kpcs_do_file_open(struct fuse_conn *fc, struct 
file *file, struct ino
        /* di.id.name.data  = name; */
        /* di.id.name.len   = id->name.len; */
 
+       spin_lock_init(&di->lock);
+       INIT_LIST_HEAD(&di->size.grow_queue);
+       INIT_LIST_HEAD(&di->size.shrink_queue);
+       INIT_WORK(&di->size.work, fuse_size_grow_work);
+
        pcs_mapping_init(&pfc->cc, &di->mapping);
        pcs_set_fileinfo(di, &info);
        di->cluster = &pfc->cc;
@@ -317,6 +323,8 @@ void kpcs_inode_release(struct fuse_inode *fi)
        if(!di)
                return;
 
+       BUG_ON(!list_empty(&di->size.grow_queue));
+       BUG_ON(!list_empty(&di->size.shrink_queue));
        pcs_mapping_invalidate(&di->mapping);
        pcs_mapping_deinit(&di->mapping);
        /* TODO: properly destroy dentry info here!! */
@@ -573,6 +581,178 @@ void ireq_destroy(struct pcs_int_request *ireq)
        kmem_cache_free(pcs_ireq_cachep, ireq);
 }
 
+static int submit_size_grow(struct inode *inode, unsigned long long size)
+{
+       struct fuse_conn *fc = get_fuse_conn(inode);
+       struct fuse_setattr_in inarg;
+       struct fuse_attr_out outarg;
+       struct fuse_req *req;
+       int err;
+
+       /* Caller comes here w/o i_mutex, but vfs_truncate is blocked
+          at inode_dio_wait() see fuse_set_nowrite
+        */
+       BUG_ON(!atomic_read(&inode->i_dio_count));
+
+       TRACE("ino:%ld size:%lld \n",inode->i_ino, size);
+
+       req = fuse_get_req_nopages(fc);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+
+       memset(&inarg, 0, sizeof(inarg));
+       memset(&outarg, 0, sizeof(outarg));
+
+       inarg.valid |= FATTR_SIZE;
+       inarg.size = size;
+
+       req->io_inode = inode;
+       req->in.h.opcode = FUSE_SETATTR;
+       req->in.h.nodeid = get_node_id(inode);
+       req->in.numargs = 1;
+       req->in.args[0].size = sizeof(inarg);
+       req->in.args[0].value = &inarg;
+       req->out.numargs = 1;
+       req->out.args[0].size = sizeof(outarg);
+       req->out.args[0].value = &outarg;
+
+       fuse_request_send(fc, req);
+       err = req->out.h.error;
+       fuse_put_request(fc, req);
+
+       return err;
+
+}
+
+static void fuse_size_grow_work(struct work_struct *w)
+{
+       struct pcs_dentry_info* di = container_of(w, struct pcs_dentry_info, 
size.work);
+       struct inode *inode = &di->inode->inode;
+       struct pcs_int_request* ireq, *next;
+       unsigned long long size = 0;
+       LIST_HEAD(to_submit);
+
+       spin_lock(&di->lock);
+       BUG_ON(di->size.shrink);
+       if (di->size.required) {
+               spin_unlock(&di->lock);
+               return;
+       }
+       list_for_each_entry(ireq, &di->size.grow_queue, list) {
+               struct pcs_fuse_req *r = container_of(ireq, struct 
pcs_fuse_req, exec.ireq);
+
+               TRACE("ino:%ld r(%p)->size:%lld  required:%lld\n",inode->i_ino, 
r, r->exec.size.required, size);
+
+               BUG_ON(!r->exec.size.required);
+               if (size < r->exec.size.required)
+                       size = r->exec.size.required;
+       }
+       di->size.required = size;
+       spin_unlock(&di->lock);
+       submit_size_grow(inode, size);
+
+       spin_lock(&di->lock);
+       BUG_ON(di->size.shrink);
+       BUG_ON(di->size.required != size);
+       list_for_each_entry_safe(ireq, next, &di->size.grow_queue, list) {
+               struct pcs_fuse_req *r = container_of(ireq, struct 
pcs_fuse_req, exec.ireq);
+
+               BUG_ON(!r->exec.size.required);
+               BUG_ON(!r->exec.size.waiting);
+               BUG_ON(r->exec.size.granted);
+               if (size >= r->exec.size.required) {
+                       TRACE("resubmit ino:%ld r(%p)->size:%lld  
required:%lld\n",inode->i_ino, r, r->exec.size.required, size);
+
+                       r->exec.size.waiting = 0;
+                       r->exec.size.granted = 1;
+                       list_move(&ireq->list, &to_submit);
+               }
+       }
+       di->size.required = 0;
+       if (!list_empty(&di->size.grow_queue))
+               queue_work(pcs_wq, &di->size.work);
+       spin_unlock(&di->lock);
+
+       pcs_cc_requeue(di->cluster, &to_submit);
+}
+
+static void wait_grow(struct pcs_fuse_req *r, struct pcs_dentry_info *di, 
unsigned long long required)
+{
+       assert_spin_locked(&di->lock);
+       BUG_ON(r->exec.size.waiting);
+       BUG_ON(r->req.in.h.opcode != FUSE_WRITE);
+
+       TRACE("insert ino:%ld->required:%lld r(%p)->required:%lld\n", 
r->req.io_inode->i_ino,
+             di->size.required, r, required);
+       r->exec.size.required = required;
+       r->exec.size.waiting = 1;
+       list_add_tail(&r->exec.ireq.list, &di->size.grow_queue);
+
+       if (!di->size.required)
+               queue_work(pcs_wq, &di->size.work);
+}
+static void wait_shrink(struct pcs_fuse_req *r, struct pcs_dentry_info *di)
+{
+       assert_spin_locked(&di->lock);
+       BUG_ON(r->exec.size.waiting);
+       /* Writes already blocked via fuse_set_nowrite */
+       BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+       TRACE("insert ino:%ld r:%p\n", r->req.io_inode->i_ino, r);
+       r->exec.size.waiting = 1;
+       list_add_tail(&r->exec.ireq.list, &di->size.shrink_queue);
+}
+
+/*
+ * Check i size boundary and deffer request if necessary
+ * Ret code
+ * 0: ready for submission
+ * -1: should fail request
+ * 1: request placed to pended queue
+*/
+static int pcs_fuse_prep_rw(struct pcs_fuse_req *r)
+{
+       struct fuse_inode *fi = get_fuse_inode(r->req.io_inode);
+       struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+       int ret = 0;
+
+       spin_lock(&di->lock);
+       /* Deffer all requests if shrink requested to prevent livelock */
+       if (di->size.shrink) {
+               wait_shrink(r, di);
+               spin_unlock(&di->lock);
+               return 1;
+       }
+       if (r->req.in.h.opcode == FUSE_READ) {
+               size_t size;
+               struct fuse_read_in *in = &r->req.misc.read.in;
+
+               size = in->size;
+               if (in->offset + in->size > di->fileinfo.attr.size) {
+                       if (in->offset >= di->fileinfo.attr.size) {
+                               r->req.out.args[0].size = 0;
+                               spin_unlock(&di->lock);
+                               return -1;
+                       }
+                       size = di->fileinfo.attr.size - in->offset;
+               }
+               pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
+       } else {
+               struct fuse_write_in *in = &r->req.misc.write.in;
+
+               if (in->offset + in->size > di->fileinfo.attr.size) {
+                       wait_grow(r, di, in->offset + in->size);
+                       ret = 1;
+               }
+               pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
+
+       }
+       inode_dio_begin(r->req.io_inode);
+       spin_unlock(&di->lock);
+
+       return ret;
+}
+
 static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, struct fuse_req 
*req, int async)
 {
        struct pcs_fuse_req *r = pcs_req_from_fuse(req);
@@ -584,32 +764,23 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, 
struct fuse_req *req,
        BUG_ON(req->cache != pcs_fuse_req_cachep);
 
        /* Init pcs_fuse_req */
-       memset(&r->exec.io, 0, sizeof(r->exec.io));
-       memset(&r->exec.ctl, 0, sizeof(r->exec.ctl));
+       memset(&r->exec, 0, sizeof(r->exec));
        /* Use inline request structure */
        ireq = &r->exec.ireq;
        ireq_init(di, ireq);
 
        switch (r->req.in.h.opcode) {
-       case FUSE_WRITE: {
-               struct fuse_write_in *in = &r->req.misc.write.in;
-
-               pcs_fuse_prep_io(r, PCS_REQ_T_WRITE, in->offset, in->size);
-               goto submit;
-       }
+       case FUSE_WRITE:
        case FUSE_READ: {
-               struct fuse_read_in *in = &r->req.misc.read.in;
-               size_t size = in->size;
-
-               if (in->offset + in->size > di->fileinfo.attr.size) {
-                       if (in->offset >= di->fileinfo.attr.size) {
-                               req->out.args[0].size = 0;
-                               break;
-                       }
-                       size = di->fileinfo.attr.size - in->offset;
-               }
-               pcs_fuse_prep_io(r, PCS_REQ_T_READ, in->offset, size);
-               goto submit;
+               int ret;
+
+               ret = pcs_fuse_prep_rw(r);
+               if (!ret)
+                       goto submit;
+               if (ret > 0)
+                       /* Pended, nothing to do. */
+                       return;
+               break;
        }
        case FUSE_FSYNC:
                pcs_fuse_prep_io(r, PCS_REQ_T_SYNC, 0, 0);
@@ -627,12 +798,69 @@ static void pcs_fuse_submit(struct pcs_fuse_cluster *pfc, 
struct fuse_req *req,
                ireq_process(ireq);
 }
 
+static void kpcs_setattr_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+       struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+       struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+       struct fuse_attr_out *outarg = (void*) req->out.args[0].value;
+       struct pcs_dentry_info *di = fi->private;
+
+       BUG_ON(req->in.h.opcode != FUSE_SETATTR);
+       BUG_ON(!di);
+       di = pcs_inode_from_fuse(fi);
+       spin_lock(&di->lock);
+       TRACE("update size: ino:%lu old_sz:%lld 
new:%lld\n",req->io_inode->i_ino,
+             di->fileinfo.attr.size, outarg->attr.size);
+
+       if (!req->out.h.error)
+               di->fileinfo.attr.size = outarg->attr.size;
+       spin_unlock(&di->lock);
+       if(r->end)
+               r->end(fc, req);
+}
 
-int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, bool lk)
+static void _pcs_shrink_end(struct fuse_conn *fc, struct fuse_req *req)
 {
        struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
        struct fuse_inode *fi = get_fuse_inode(req->io_inode);
+       struct pcs_dentry_info *di = fi->private;
+       LIST_HEAD(dispose);
+
+       kpcs_setattr_end(fc, req);
+
+       spin_lock(&di->lock);
+       BUG_ON(!di->size.shrink);
+       BUG_ON(di->size.required);
+       BUG_ON(!list_empty(&di->size.grow_queue));
+
+       list_splice_init(&di->size.shrink_queue, &dispose);
+       di->size.shrink = 0;
+       spin_unlock(&di->lock);
 
+       while (!list_empty(&dispose)) {
+               struct pcs_int_request* ireq = list_first_entry(&dispose, 
struct pcs_int_request, list);
+               struct pcs_fuse_req *r = container_of(ireq, struct 
pcs_fuse_req, exec.ireq);
+
+               BUG_ON(!r->exec.size.waiting);
+               BUG_ON(r->exec.size.granted);
+               BUG_ON(r->req.in.h.opcode != FUSE_READ);
+
+               TRACE("resubmit %p\n", &r->req);
+               list_del_init(&ireq->list);
+               r->exec.size.waiting = 0;
+               pcs_fuse_submit(pfc, &r->req, 1);
+       }
+}
+
+static void _pcs_grow_end(struct fuse_conn *fc, struct fuse_req *req)
+{
+       kpcs_setattr_end(fc, req);
+}
+
+static int kpcs_req_send(struct fuse_conn* fc, struct fuse_req *req, bool bg, 
bool lk)
+{
+       struct pcs_fuse_cluster *pfc = (struct pcs_fuse_cluster*)fc->kio.ctx;
+       struct fuse_inode *fi = get_fuse_inode(req->io_inode);
        if (!fc->initialized || fc->conn_error)
                return 1;
 
@@ -643,7 +871,7 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req 
*req, bool bg, bool lk)
         */
        BUG_ON(!list_empty(&req->list));
 
-       TRACE(" Enter req:%p op:%d bg:%d lk:%d\n", req, req->in.h.opcode, bg, 
lk);
+       TRACE(" Enter req:%p op:%d end:%p bg:%d lk:%d\n", req, 
req->in.h.opcode, req->end, bg, lk);
 
        /* TODO: This is just a crunch, Conn cleanup requires sane locking */
        if (req->in.h.opcode == FUSE_DESTROY) {
@@ -653,13 +881,49 @@ int kpcs_req_send(struct fuse_conn* fc, struct fuse_req 
*req, bool bg, bool lk)
                spin_unlock(&fc->lock);
                return 1;
        }
-       if ((req->in.h.opcode != FUSE_READ &&
-            req->in.h.opcode != FUSE_WRITE))
-               return 1;
+       switch (req->in.h.opcode) {
+       case FUSE_SETATTR: {
+               struct pcs_fuse_req *r = pcs_req_from_fuse(req);
+               struct fuse_setattr_in *inarg = (void*) req->in.args[0].value;
+               struct pcs_dentry_info *di = pcs_inode_from_fuse(fi);
+               int shrink = 0;
+
+               if (!(inarg->valid & FATTR_SIZE))
+                       return 1;
+
 
-       fi = get_fuse_inode(req->io_inode);
-       if (!fi->private)
+               spin_lock(&di->lock);
+               if (inarg->size < di->fileinfo.attr.size) {
+                       BUG_ON(di->size.shrink);
+                       di->size.shrink = shrink = 1;
+               }
+               spin_unlock(&di->lock);
+               r->end = req->end;
+               if (shrink) {
+                       BUG_ON(!mutex_is_locked(&req->io_inode->i_mutex));
+                       /* wait for aio reads in flight */
+                       inode_dio_wait(req->io_inode);
+                       /*
+                        * Writebackcache was flushed already so it is safe to
+                        * drop pcs_mapping
+                        */
+                       pcs_map_invalidate_tail(&di->mapping, inarg->size);
+                       req->end = _pcs_shrink_end;
+               } else
+                       req->end = _pcs_grow_end;
+               return 1;
+       }
+       case FUSE_READ:
+       case FUSE_WRITE:
+               fi = get_fuse_inode(req->io_inode);
+               if (!fi->private)
+                       return 1;
+
+               break;
+       default:
                return 1;
+       }
+
 
        __clear_bit(FR_BACKGROUND, &req->flags);
        __clear_bit(FR_PENDING, &req->flags);
diff --git a/fs/fuse/kio/pcs/pcs_map.h b/fs/fuse/kio/pcs/pcs_map.h
index 754e0f177d46..11176b2b80d5 100644
--- a/fs/fuse/kio/pcs/pcs_map.h
+++ b/fs/fuse/kio/pcs/pcs_map.h
@@ -260,5 +260,10 @@ static inline struct pcs_map_entry *pcs_map_get(struct 
pcs_map_entry *m)
        return m;
 }
 
+static inline void pcs_map_invalidate_tail(struct pcs_mapping * mapping, u64 
offset)
+{
+       unsigned long index = offset >> mapping->chunk_size_bits;
 
+       map_truncate_tail(mapping, index << mapping->chunk_size_bits);
+}
 #endif /* _PCS_MAP_H_ */
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to