looks like V3 is the same as V2. how about my incremental patch attached below.
I think it makes the code more clear.

---
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 091530d..cbc93ea 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -508,7 +508,6 @@ ceph_sync_direct_write(struct kiocb *iocb, const struct 
iovec *iov,
        struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
-       int num_ops = 1;
        struct page **pages;
        int num_pages;
        u64 len;
@@ -516,9 +515,10 @@ ceph_sync_direct_write(struct kiocb *iocb, const struct 
iovec *iov,
        int flags;
        int check_caps = 0;
        int page_align;
-       int ret, i;
+       int ret;
        struct timespec mtime = CURRENT_TIME;
        loff_t pos = iocb->ki_pos;
+       struct iov_iter i;
 
        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
                return -EROFS;
@@ -539,21 +539,20 @@ ceph_sync_direct_write(struct kiocb *iocb, const struct 
iovec *iov,
        flags = CEPH_OSD_FLAG_ORDERSNAP |
                CEPH_OSD_FLAG_ONDISK |
                CEPH_OSD_FLAG_WRITE;
-       num_ops++;      /* Also include a 'startsync' command. */
 
-       for (i = 0; i < nr_segs && count; i++) {
-               void __user *data = iov[i].iov_base;
-               size_t left;
+       iov_iter_init(&i, iov, nr_segs, count, 0);
+
+       for (iov_iter_count(&i) > 0) {
+               void __user *data = i.iov->iov_base + i.iov_offset;
+               u64 len = i.iov->iov_len - i.iov_offset;
 
-               left = min(count, iov[i].iov_len);
-more:
                page_align = (unsigned long)data & ~PAGE_MASK;
-               len = left;
 
                snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, num_ops,
+                                           vino, pos, &len,
+                                           2, /* include a 'startsync' command 
*/
                                            CEPH_OSD_OP_WRITE, flags, snapc,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
@@ -593,13 +592,8 @@ out:
                if (ret == 0) {
                        pos += len;
                        written += len;
-                       left -= len;
-                       count -= len;
-                       data += len;
-                       if (left)
-                               goto more;
+                       iov_iter_advance(&i, (size_t)len);
 
-                       ret = written;
                        if (pos > i_size_read(inode))
                                check_caps = ceph_inode_set_size(inode, pos);
                                if (check_caps)
@@ -607,14 +601,14 @@ out:
                                                        CHECK_CAPS_AUTHONLY,
                                                        NULL);
                } else {
-                       if (ret != -EOLDSNAPC && written > 0)
-                               ret = written;
                        break;
                }
        }
 
-       if (ret > 0)
+       if (ret != -EOLDSNAPC && written > 0) {
                iocb->ki_pos = pos;
+               ret = written;
+       }
        return ret;
 }
 
@@ -636,10 +630,9 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, const 
struct iovec *iov,
        struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
-       int num_ops = 1;
        struct page **pages;
-       int num_pages;
        u64 len;
+       int num_pages;
        int written = 0;
        int flags;
        int check_caps = 0;
@@ -670,18 +663,14 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, const 
struct iovec *iov,
 
        iov_iter_init(&i, iov, nr_segs, count, 0);
 
-       while (i.count) {
-               void __user *data;
+       while ((len = iov_iter_count(&i)) > 0) {
                size_t left;
-
-               left = i.count;
-more:
-               len = left;
+               int n;
 
                snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                           vino, pos, &len, num_ops,
+                                           vino, pos, &len, 1,
                                            CEPH_OSD_OP_WRITE, flags, snapc,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
@@ -702,42 +691,17 @@ more:
                        ret = PTR_ERR(pages);
                        goto out;
                }
-               if (len <= i.iov[0].iov_len - i.iov_offset) {
-                       data = i.iov[0].iov_base + i.iov_offset;
-                       ret = ceph_copy_user_to_page_vector(pages,
-                                                           data, 0, len);
 
-                       if (ret > 0)
-                               iov_iter_advance(&i, ret);
-               } else {
-                       int l, k = 0, copyed = 0;
-                       size_t tmp = len;
-
-                       while (tmp) {
-                               data = i.iov[0].iov_base + i.iov_offset;
-                               l = i.iov[0].iov_len - i.iov_offset;
-
-                               if (tmp < l) {
-                                       ret = 
ceph_copy_user_to_page_vector(&pages[k],
-                                                                           
data,
-                                                                           
copyed,
-                                                                           
tmp);
-                                       if (ret > 0)
-                                               iov_iter_advance(&i, ret);
-                                       break;
-                               } else if (l) {
-                                       ret = 
ceph_copy_user_to_page_vector(&pages[k],
-                                                                           
data,
-                                                                           
copyed,
-                                                                           l);
-                                       if (ret < 0)
-                                               break;
-                                       iov_iter_advance(&i, ret);
-                                       copyed += ret;
-                                       tmp -= ret;
-                                       k = calc_pages_for(0, copyed + 1) - 1;
-                               }
+               left = len;
+               for (n = 0; n < num_pages; n++) {
+                       size_t plen = min(left, PAGE_SIZE);
+                       ret = iov_iter_copy_from_user(pages[n], &i, 0, l);
+                       if (ret != plen) {
+                               ret = -EFAULT;
+                               break;
                        }
+                       left -= ret;
+                       iov_iter_advance(&i, ret);
                }
 
                if (ret < 0) {
@@ -764,26 +728,23 @@ out:
                if (ret == 0) {
                        pos += len;
                        written += len;
-                       left -= len;
-                       if (left)
-                               goto more;
 
-                       ret = written;
-                       if (pos > i_size_read(inode))
+                       if (pos > i_size_read(inode)) {
                                check_caps = ceph_inode_set_size(inode, pos);
                                if (check_caps)
                                        ceph_check_caps(ceph_inode(inode),
                                                        CHECK_CAPS_AUTHONLY,
                                                        NULL);
+                       }
                } else {
-                       if (ret != -EOLDSNAPC && written > 0)
-                               ret = written;
                        break;
                }
        }
 
-       if (ret > 0)
+       if (ret != -EOLDSNAPC && written > 0) {
+               ret = written;
                iocb->ki_pos = pos;
+       }
        return ret;
 }
 
---


On 09/10/2013 10:04 AM, majianpeng wrote:
> For writev/pwritev sync-operatoin, ceph only do the first iov.
> It don't think other iovs.Now implement this.
> I divided the write-sync-operation into two functions.One for
> direct-write,other for none-direct-sync-write.This is because for
> none-direct-sync-write we can merge iovs to one.But for direct-write,
> we can't merge iovs.
> 
> V2:
>   -using struct iov_iter replace clone iovs in ceph_sync_write.
> 
> Signed-off-by: Jianpeng Ma <[email protected]>
> Reviewed-by: Yan, Zheng <[email protected]>
> ---
>  fs/ceph/file.c | 314 
> ++++++++++++++++++++++++++++++++++++++++++---------------
>  1 file changed, 234 insertions(+), 80 deletions(-)
> 
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 1c28c52..8dcde64 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -547,17 +547,19 @@ static void ceph_sync_write_unsafe(struct 
> ceph_osd_request *req, bool unsafe)
>       }
>  }
>  
> +
>  /*
> - * Synchronous write, straight from __user pointer or user pages (if
> - * O_DIRECT).
> + * Synchronous write, straight from __user pointer or user pages.
>   *
>   * If write spans object boundary, just do multiple writes.  (For a
>   * correct atomic write, we should e.g. take write locks on all
>   * objects, rollback on failure, etc.)
>   */
> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
> -                            size_t left, loff_t pos, loff_t *ppos)
> +static ssize_t
> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
> +                    unsigned long nr_segs, size_t count)
>  {
> +     struct file *file = iocb->ki_filp;
>       struct inode *inode = file_inode(file);
>       struct ceph_inode_info *ci = ceph_inode(inode);
>       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> @@ -571,59 +573,55 @@ static ssize_t ceph_sync_write(struct file *file, const 
> char __user *data,
>       int written = 0;
>       int flags;
>       int check_caps = 0;
> -     int page_align, io_align;
> -     unsigned long buf_align;
> -     int ret;
> +     int page_align;
> +     int ret, i;
>       struct timespec mtime = CURRENT_TIME;
> -     bool own_pages = false;
> +     loff_t pos = iocb->ki_pos;
>  
>       if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>               return -EROFS;
>  
> -     dout("sync_write on file %p %lld~%u %s\n", file, pos,
> -          (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
> +     dout("sync_direct_write on file %p %lld~%u\n", file, pos,
> +          (unsigned)count);
>  
> -     ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
> +     ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>       if (ret < 0)
>               return ret;
>  
>       ret = invalidate_inode_pages2_range(inode->i_mapping,
>                                           pos >> PAGE_CACHE_SHIFT,
> -                                         (pos + left) >> PAGE_CACHE_SHIFT);
> +                                         (pos + count) >> PAGE_CACHE_SHIFT);
>       if (ret < 0)
>               dout("invalidate_inode_pages2_range returned %d\n", ret);
>  
>       flags = CEPH_OSD_FLAG_ORDERSNAP |
>               CEPH_OSD_FLAG_ONDISK |
>               CEPH_OSD_FLAG_WRITE;
> -     if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
> -             flags |= CEPH_OSD_FLAG_ACK;
> -     else
> -             num_ops++;      /* Also include a 'startsync' command. */
> +     num_ops++;      /* Also include a 'startsync' command. */
>  
> -     /*
> -      * we may need to do multiple writes here if we span an object
> -      * boundary.  this isn't atomic, unfortunately.  :(
> -      */
> -more:
> -     io_align = pos & ~PAGE_MASK;
> -     buf_align = (unsigned long)data & ~PAGE_MASK;
> -     len = left;
> +     for (i = 0; i < nr_segs && count; i++) {
> +             void __user *data = iov[i].iov_base;
> +             size_t left;
>  
> -     snapc = ci->i_snap_realm->cached_context;
> -     vino = ceph_vino(inode);
> -     req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> -                                 vino, pos, &len, num_ops,
> -                                 CEPH_OSD_OP_WRITE, flags, snapc,
> -                                 ci->i_truncate_seq, ci->i_truncate_size,
> -                                 false);
> -     if (IS_ERR(req))
> -             return PTR_ERR(req);
> +             left = min(count, iov[i].iov_len);
> +more:
> +             page_align = (unsigned long)data & ~PAGE_MASK;
> +             len = left;
> +
> +             snapc = ci->i_snap_realm->cached_context;
> +             vino = ceph_vino(inode);
> +             req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +                                         vino, pos, &len, num_ops,
> +                                         CEPH_OSD_OP_WRITE, flags, snapc,
> +                                         ci->i_truncate_seq,
> +                                         ci->i_truncate_size,
> +                                         false);
> +             if (IS_ERR(req)) {
> +                     ret = PTR_ERR(req);
> +                     goto out;
> +             }
>  
> -     /* write from beginning of first page, regardless of io alignment */
> -     page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
> -     num_pages = calc_pages_for(page_align, len);
> -     if (file->f_flags & O_DIRECT) {
> +             num_pages = calc_pages_for(page_align, len);
>               pages = ceph_get_direct_page_vector(data, num_pages, false);
>               if (IS_ERR(pages)) {
>                       ret = PTR_ERR(pages);
> @@ -635,61 +633,215 @@ more:
>                * may block.
>                */
>               truncate_inode_pages_range(inode->i_mapping, pos,
> -                                        (pos+len) | (PAGE_CACHE_SIZE-1));
> -     } else {
> +                                (pos+len) | (PAGE_CACHE_SIZE-1));
> +             osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> +                                             false, false);
> +
> +             /* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +             ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +
> +             ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +             if (!ret)
> +                     ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +
> +             ceph_put_page_vector(pages, num_pages, false);
> +
> +out:
> +             ceph_osdc_put_request(req);
> +             if (ret == 0) {
> +                     pos += len;
> +                     written += len;
> +                     left -= len;
> +                     count -= len;
> +                     data += len;
> +                     if (left)
> +                             goto more;
> +
> +                     ret = written;
> +                     if (pos > i_size_read(inode))
> +                             check_caps = ceph_inode_set_size(inode, pos);
> +                             if (check_caps)
> +                                     ceph_check_caps(ceph_inode(inode),
> +                                                     CHECK_CAPS_AUTHONLY,
> +                                                     NULL);
> +             } else {
> +                     if (ret != -EOLDSNAPC && written > 0)
> +                             ret = written;
> +                     break;
> +             }
> +     }
> +
> +     if (ret > 0)
> +             iocb->ki_pos = pos;
> +     return ret;
> +}
> +
> +
> +/*
> + * Synchronous write, straight from __user pointer or user pages.
> + *
> + * If write spans object boundary, just do multiple writes.  (For a
> + * correct atomic write, we should e.g. take write locks on all
> + * objects, rollback on failure, etc.)
> + */
> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
> +                            unsigned long nr_segs, size_t count)
> +{
> +     struct file *file = iocb->ki_filp;
> +     struct inode *inode = file_inode(file);
> +     struct ceph_inode_info *ci = ceph_inode(inode);
> +     struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> +     struct ceph_snap_context *snapc;
> +     struct ceph_vino vino;
> +     struct ceph_osd_request *req;
> +     int num_ops = 1;
> +     struct page **pages;
> +     int num_pages;
> +     u64 len;
> +     int written = 0;
> +     int flags;
> +     int check_caps = 0;
> +     int ret;
> +     struct timespec mtime = CURRENT_TIME;
> +     loff_t pos = iocb->ki_pos;
> +     struct iov_iter i;
> +
> +     if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
> +             return -EROFS;
> +
> +     dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
> +
> +     ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
> +     if (ret < 0)
> +             return ret;
> +
> +     ret = invalidate_inode_pages2_range(inode->i_mapping,
> +                                         pos >> PAGE_CACHE_SHIFT,
> +                                         (pos + count) >> PAGE_CACHE_SHIFT);
> +     if (ret < 0)
> +             dout("invalidate_inode_pages2_range returned %d\n", ret);
> +
> +     flags = CEPH_OSD_FLAG_ORDERSNAP |
> +             CEPH_OSD_FLAG_ONDISK |
> +             CEPH_OSD_FLAG_WRITE |
> +             CEPH_OSD_FLAG_ACK;
> +
> +     iov_iter_init(&i, iov, nr_segs, count, 0);
> +
> +     while (i.count) {
> +             void __user *data;
> +             size_t left;
> +
> +             left = i.count;
> +more:
> +             len = left;
> +
> +             snapc = ci->i_snap_realm->cached_context;
> +             vino = ceph_vino(inode);
> +             req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +                                         vino, pos, &len, num_ops,
> +                                         CEPH_OSD_OP_WRITE, flags, snapc,
> +                                         ci->i_truncate_seq,
> +                                         ci->i_truncate_size,
> +                                         false);
> +             if (IS_ERR(req)) {
> +                     ret = PTR_ERR(req);
> +                     goto out;
> +             }
> +
> +             /*
> +              * write from beginning of first page,
> +              * regardless of io alignment
> +              */
> +             num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
> +
>               pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
>               if (IS_ERR(pages)) {
>                       ret = PTR_ERR(pages);
>                       goto out;
>               }
> -             ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
> +             if (len <= i.iov[0].iov_len - i.iov_offset) {
> +                     data = i.iov[0].iov_base + i.iov_offset;
> +                     ret = ceph_copy_user_to_page_vector(pages,
> +                                                         data, 0, len);
> +
> +                     if (ret > 0)
> +                             iov_iter_advance(&i, ret);
> +             } else {
> +                     int l, k = 0, copyed = 0;
> +                     size_t tmp = len;
> +
> +                     while (tmp) {
> +                             data = i.iov[0].iov_base + i.iov_offset;
> +                             l = i.iov[0].iov_len - i.iov_offset;
> +
> +                             if (tmp < l) {
> +                                     ret = 
> ceph_copy_user_to_page_vector(&pages[k],
> +                                                                         
> data,
> +                                                                         
> copyed,
> +                                                                         
> tmp);
> +                                     if (ret > 0)
> +                                             iov_iter_advance(&i, ret);
> +                                     break;
> +                             } else if (l) {
> +                                     ret = 
> ceph_copy_user_to_page_vector(&pages[k],
> +                                                                         
> data,
> +                                                                         
> copyed,
> +                                                                         l);
> +                                     if (ret < 0)
> +                                             break;
> +                                     iov_iter_advance(&i, ret);
> +                                     copyed += ret;
> +                                     tmp -= ret;
> +                                     k = calc_pages_for(0, copyed + 1) - 1;
> +                             }
> +                     }
> +             }
> +
>               if (ret < 0) {
>                       ceph_release_page_vector(pages, num_pages);
>                       goto out;
>               }
>  
> -             if ((file->f_flags & O_SYNC) == 0) {
> -                     /* get a second commit callback */
> -                     req->r_unsafe_callback = ceph_sync_write_unsafe;
> -                     req->r_inode = inode;
> -                     own_pages = true;
> -             }
> -     }
> -     osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> -                                     false, own_pages);
> +             /* get a second commit callback */
> +             req->r_unsafe_callback = ceph_sync_write_unsafe;
> +             req->r_inode = inode;
>  
> -     /* BUG_ON(vino.snap != CEPH_NOSNAP); */
> -     ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +             osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
> +                                             false, true);
>  
> -     ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -     if (!ret)
> -             ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +             /* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +             ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>  
> -     if (file->f_flags & O_DIRECT)
> -             ceph_put_page_vector(pages, num_pages, false);
> -     else if (file->f_flags & O_SYNC)
> -             ceph_release_page_vector(pages, num_pages);
> +             ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +             if (!ret)
> +                     ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  
>  out:
> -     ceph_osdc_put_request(req);
> -     if (ret == 0) {
> -             pos += len;
> -             written += len;
> -             left -= len;
> -             data += len;
> -             if (left)
> -                     goto more;
> -
> -             ret = written;
> -             *ppos = pos;
> -             if (pos > i_size_read(inode))
> -                     check_caps = ceph_inode_set_size(inode, pos);
> -             if (check_caps)
> -                     ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
> -                                     NULL);
> -     } else if (ret != -EOLDSNAPC && written > 0) {
> -             ret = written;
> +             ceph_osdc_put_request(req);
> +             if (ret == 0) {
> +                     pos += len;
> +                     written += len;
> +                     left -= len;
> +                     if (left)
> +                             goto more;
> +
> +                     ret = written;
> +                     if (pos > i_size_read(inode))
> +                             check_caps = ceph_inode_set_size(inode, pos);
> +                             if (check_caps)
> +                                     ceph_check_caps(ceph_inode(inode),
> +                                                     CHECK_CAPS_AUTHONLY,
> +                                                     NULL);
> +             } else {
> +                     if (ret != -EOLDSNAPC && written > 0)
> +                             ret = written;
> +                     break;
> +             }
>       }
> +
> +     if (ret > 0)
> +             iocb->ki_pos = pos;
>       return ret;
>  }
>  
> @@ -844,11 +996,13 @@ retry_snap:
>            inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>  
>       if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
> -         (iocb->ki_filp->f_flags & O_DIRECT) ||
> -         (fi->flags & CEPH_F_SYNC)) {
> +         (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
>               mutex_unlock(&inode->i_mutex);
> -             written = ceph_sync_write(file, iov->iov_base, count,
> -                                       pos, &iocb->ki_pos);
> +             if (file->f_flags & O_DIRECT)
> +                     written = ceph_sync_direct_write(iocb, iov,
> +                                                      nr_segs, count);
> +             else
> +                     written = ceph_sync_write(iocb, iov, nr_segs, count);
>               if (written == -EOLDSNAPC) {
>                       dout("aio_write %p %llx.%llx %llu~%u"
>                               "got EOLDSNAPC, retrying\n",
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to