pio should be main entity of all driver, and
waiting locked cluster should be made via pio
postponeing. So, we rework merge to fit that.

Signed-off-by: Kirill Tkhai <[email protected]>
---
 drivers/md/dm-ploop-cmd.c    |  200 ++++++++++++------------------------------
 drivers/md/dm-ploop-map.c    |   20 +++-
 drivers/md/dm-ploop-target.c |    5 +
 drivers/md/dm-ploop.h        |   32 ++++---
 4 files changed, 92 insertions(+), 165 deletions(-)

diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index b36bb158a3ac..401380de25db 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -3,21 +3,11 @@
 #include <linux/uio.h>
 #include <linux/ctype.h>
 #include <linux/umh.h>
+#include <linux/sched/signal.h>
 #include "dm-ploop.h"
 
 #define DM_MSG_PREFIX "ploop"
 
-static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd 
*cmd)
-{
-       unsigned long flags;
-
-       spin_lock_irqsave(&ploop->deferred_lock, flags);
-       BUG_ON(ploop->deferred_cmd && ploop->deferred_cmd != cmd);
-       ploop->deferred_cmd = cmd;
-       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
-       queue_work(ploop->wq, &ploop->worker);
-}
-
 /*
  * Assign newly allocated memory for BAT array and holes_bitmap
  * before grow.
@@ -557,8 +547,6 @@ static int ploop_resize(struct ploop *ploop, sector_t 
new_sectors)
        cmd.resize.hb_nr = hb_nr;
        cmd.resize.new_sectors = new_sectors;
        cmd.resize.md0 = md0;
-       cmd.retval = 0;
-       cmd.ploop = ploop;
 
        ploop_suspend_submitting_pios(ploop);
        ret = process_resize_cmd(ploop, &cmd);
@@ -570,106 +558,75 @@ static int ploop_resize(struct ploop *ploop, sector_t 
new_sectors)
        free_md_pages_tree(&cmd.resize.md_pages_root);
        return ret;
 }
-
-static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop,
-                                            int ret, void *data)
+static void service_pio_endio(struct pio *pio, void *data, blk_status_t status)
 {
-       struct ploop_cmd *cmd = data;
-
-       if (ret) {
-               /* kwork will see this at next time it is on cpu */
-               WRITE_ONCE(cmd->retval, ret);
-       }
-       atomic_inc(&cmd->merge.nr_available);
-       ploop_queue_deferred_cmd(cmd->ploop, cmd);
-}
-
-/* Find mergeable cluster and return it in cmd->merge.cluster */
-static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd)
-{
-       unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
-       u8 level;
-       bool skip;
-
-       BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT);
-
-       for (; *cluster < ploop->nr_bat_entries; ++*cluster) {
-               /*
-                * Check *cluster is provided by the merged delta.
-                * We are in kwork, so bat_rwlock is not needed
-                * (see comment in process_one_deferred_bio()).
-                */
-               /* FIXME: Optimize this. ploop_bat_entries() is overkill */
-               dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
-               if (dst_cluster == BAT_ENTRY_NONE ||
-                   level != ploop->nr_deltas - 2)
-                       continue;
-
-               spin_lock_irq(&ploop->deferred_lock);
-               skip = find_lk_of_cluster(ploop, *cluster);
-               spin_unlock_irq(&ploop->deferred_lock);
-               if (skip) {
-                       /*
-                        * Cluster is locked (maybe, under COW).
-                        * Skip it and try to repeat later.
-                        */
-                       cmd->merge.do_repeat = true;
-                       continue;
-               }
+       struct ploop *ploop = pio->ploop;
+       blk_status_t *status_ptr = data;
+       unsigned long flags;
 
-               return true;
+       if (unlikely(status)) {
+               spin_lock_irqsave(&ploop->err_status_lock, flags);
+               *status_ptr = status;
+               spin_unlock_irqrestore(&ploop->err_status_lock, flags);
        }
 
-       return false;
+       if (atomic_dec_return(&ploop->service_pios) < MERGE_PIOS_MAX / 2)
+               wake_up(&ploop->service_wq);
 }
 
-static void process_merge_latest_snapshot_cmd(struct ploop *ploop,
-                                             struct ploop_cmd *cmd)
+static int process_merge_latest_snapshot(struct ploop *ploop)
 {
-       unsigned int dst_cluster, *cluster = &cmd->merge.cluster;
-       u8 level;
-
-       if (cmd->retval)
-               goto out;
-
-       while (iter_delta_clusters(ploop, cmd)) {
-               /*
-                * We are in kwork, so bat_rwlock is not needed
-                * (we can't race with changing BAT, since cmds
-                *  are processed before bios and piwb is sync).
-                */
-               /* FIXME: Optimize this: ploop_bat_entries() is overkill */
-               dst_cluster = ploop_bat_entries(ploop, *cluster, &level);
+       static blk_status_t service_status;
+       struct bio_vec bvec = {0};
+       struct pio *pio;
+       int ret = 0;
+       u32 clu;
 
-               /* Check we can submit one more cow in parallel */
-               if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0))
-                       return;
-               /*
-                * This adds cluster lk. Further write bios to *cluster will go
-                * from ploop_map to kwork (because bat_levels[*cluster] is not
-                * top_level()), so they will see the lk.
-                */
-               if (submit_cluster_cow(ploop, level, *cluster, dst_cluster,
-                                   ploop_queue_deferred_cmd_wrapper, cmd)) {
-                       atomic_inc(&cmd->merge.nr_available);
-                       cmd->retval = -ENOMEM;
-                       goto out;
+       for (clu = 0; clu < ploop->nr_bat_entries; clu++) {
+               if (fatal_signal_pending(current)) {
+                       ret = -EINTR;
+                       break;
+               }
+               pio = kmalloc(sizeof(*pio), GFP_KERNEL);
+               if (!pio) {
+                       ret = -ENOMEM;
+                       break;
+               }
+               init_pio(ploop, REQ_OP_WRITE, pio);
+               pio->free_on_endio = true;
+               pio->bi_io_vec = &bvec;
+               pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu);
+               pio->bi_iter.bi_size = 0;
+               pio->bi_iter.bi_idx = 0;
+               pio->bi_iter.bi_bvec_done = 0;
+               pio->endio_cb = service_pio_endio;
+               pio->endio_cb_data = &service_status;
+               pio->is_fake_merge = true;
+               WARN_ON_ONCE(!fake_merge_pio(pio));
+
+               defer_pios(ploop, pio, NULL);
+
+               if (atomic_inc_return(&ploop->service_pios) == MERGE_PIOS_MAX) {
+                       wait_event(ploop->service_wq,
+                                       atomic_read(&ploop->service_pios) < 
MERGE_PIOS_MAX);
                }
 
-               ++*cluster;
+               if (unlikely(READ_ONCE(service_status)))
+                       break;
        }
-out:
-       if (atomic_read(&cmd->merge.nr_available) != NR_MERGE_BIOS) {
-               /* Wait till last COW queues us */
-               return;
+
+       wait_event(ploop->service_wq, !atomic_read(&ploop->service_pios));
+       if (!ret) {
+               spin_lock_irq(&ploop->err_status_lock);
+               ret = blk_status_to_errno(service_status);
+               spin_unlock_irq(&ploop->err_status_lock);
        }
 
-       complete(&cmd->comp); /* Last touch of cmd memory */
+       return ret;
 }
 
 static int ploop_merge_latest_snapshot(struct ploop *ploop)
 {
-       struct ploop_cmd cmd;
        struct file *file;
        u8 level;
        int ret;
@@ -680,33 +637,14 @@ static int ploop_merge_latest_snapshot(struct ploop 
*ploop)
                return -EROFS;
        if (ploop->nr_deltas < 2)
                return -ENOENT;
-again:
-       memset(&cmd, 0, sizeof(cmd));
-       cmd.type = PLOOP_CMD_MERGE_SNAPSHOT;
-       cmd.ploop = ploop;
-       atomic_set(&cmd.merge.nr_available, NR_MERGE_BIOS);
-
-       init_completion(&cmd.comp);
-       ploop_queue_deferred_cmd(ploop, &cmd);
-       ret = wait_for_completion_interruptible(&cmd.comp);
-       if (ret) {
-               /*
-                * process_merge_latest_snapshot_cmd() will see this
-                * later or earlier. Take a lock if you want earlier.
-                */
-               WRITE_ONCE(cmd.retval, -EINTR);
-               wait_for_completion(&cmd.comp);
-       }
 
-       if (cmd.retval)
+       ret = process_merge_latest_snapshot(ploop);
+       if (ret)
                goto out;
 
-       if (cmd.merge.do_repeat)
-               goto again;
-
        /* Delta merged. Release delta's file */
-       cmd.retval = ploop_suspend_submitting_pios(ploop);
-       if (cmd.retval)
+       ret = ploop_suspend_submitting_pios(ploop);
+       if (ret)
                goto out;
 
        write_lock_irq(&ploop->bat_rwlock);
@@ -719,7 +657,7 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop)
 
        ploop_resume_submitting_pios(ploop);
 out:
-       return cmd.retval;
+       return ret;
 }
 
 static void notify_delta_merged(struct ploop *ploop, u8 level,
@@ -1154,28 +1092,6 @@ static int ploop_flip_upper_deltas(struct ploop *ploop)
        return process_flip_upper_deltas(ploop);
 }
 
-/* Handle user commands requested via "message" interface */
-void process_deferred_cmd(struct ploop *ploop)
-       __releases(&ploop->deferred_lock)
-       __acquires(&ploop->deferred_lock)
-{
-       struct ploop_cmd *cmd = ploop->deferred_cmd;
-
-       if (likely(!cmd))
-               return;
-
-       ploop->deferred_cmd = NULL;
-       spin_unlock_irq(&ploop->deferred_lock);
-
-       if (cmd->type == PLOOP_CMD_MERGE_SNAPSHOT) {
-               process_merge_latest_snapshot_cmd(ploop, cmd);
-       } else {
-               cmd->retval = -EINVAL;
-               complete(&cmd->comp);
-       }
-       spin_lock_irq(&ploop->deferred_lock);
-}
-
 static int ploop_get_event(struct ploop *ploop, char *result, unsigned int 
maxlen)
 {
        unsigned int sz = 0;
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index bcdc63a1d5c9..dc2268670f70 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -70,6 +70,7 @@ void init_pio(struct ploop *ploop, unsigned int bi_op, struct 
pio *pio)
        pio->bi_op = bi_op;
        pio->wants_discard_index_cleanup = false;
        pio->is_data_alloc = false;
+       pio->is_fake_merge = false;
        pio->free_on_endio = false;
        pio->ref_index = PLOOP_REF_INDEX_INVALID;
        pio->bi_status = BLK_STS_OK;
@@ -478,6 +479,14 @@ static bool pio_endio_if_all_zeros(struct pio *pio)
        return true;
 }
 
+static bool pio_endio_if_merge_fake_pio(struct pio *pio)
+{
+       if (likely(!fake_merge_pio(pio)))
+               return false;
+       pio_endio(pio);
+       return true;
+}
+
 static int punch_hole(struct file *file, loff_t pos, loff_t len)
 {
        return vfs_fallocate(file, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
@@ -1136,9 +1145,9 @@ static bool postpone_if_cluster_locked(struct ploop 
*ploop, struct pio *pio,
        return e_h != NULL;
 }
 
-int submit_cluster_cow(struct ploop *ploop, unsigned int level,
-                      unsigned int cluster, unsigned int dst_cluster,
-                      void (*end_fn)(struct ploop *, int, void *), void *data)
+static int submit_cluster_cow(struct ploop *ploop, unsigned int level,
+                             unsigned int cluster, unsigned int dst_cluster,
+                             void (*end_fn)(struct ploop *, int, void *), void 
*data)
 {
        struct ploop_cow *cow = NULL;
        struct pio *pio = NULL;
@@ -1392,6 +1401,8 @@ static int process_one_deferred_bio(struct ploop *ploop, 
struct pio *pio,
 
        if (cluster_is_in_top_delta(ploop, cluster)) {
                /* Already mapped */
+               if (pio_endio_if_merge_fake_pio(pio))
+                       goto out;
                goto queue;
        } else if (!op_is_write(pio->bi_op)) {
                /*
@@ -1538,13 +1549,10 @@ void do_ploop_work(struct work_struct *ws)
         *
         * Currenly, it's impossible to submit two bat pages update
         * in parallel, since the update uses global ploop->bat_page.
-        * Note, that process_deferred_cmd() expects there is no
-        * pending index wb.
         */
        ploop_index_wb_init(&piwb, ploop);
 
        spin_lock_irq(&ploop->deferred_lock);
-       process_deferred_cmd(ploop);
        process_delta_wb(ploop, &piwb);
 
        list_splice_init(&ploop->deferred_pios, &deferred_pios);
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index 76f66fe11de1..3e05895d1cfe 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -123,10 +123,9 @@ void free_md_pages_tree(struct rb_root *root)
 
 static bool ploop_has_pending_activity(struct ploop *ploop)
 {
-       bool has;
+       bool has = false;
 
        spin_lock_irq(&ploop->deferred_lock);
-       has = ploop->deferred_cmd;
        has |= !list_empty(&ploop->deferred_pios);
        has |= !list_empty(&ploop->discard_pios);
        has |= !list_empty(&ploop->delta_cow_action_list);
@@ -312,7 +311,9 @@ static int ploop_ctr(struct dm_target *ti, unsigned int 
argc, char **argv)
        }
 
        rwlock_init(&ploop->bat_rwlock);
+       spin_lock_init(&ploop->err_status_lock);
        init_rwsem(&ploop->ctl_rwsem);
+       init_waitqueue_head(&ploop->service_wq);
        spin_lock_init(&ploop->inflight_lock);
        spin_lock_init(&ploop->deferred_lock);
 
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 2a474e5d3cb6..a2d6866d99a5 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -40,12 +40,10 @@ struct ploop_delta {
        bool is_raw;
 };
 
+#define MERGE_PIOS_MAX                 64
+
 struct ploop_cmd {
-#define PLOOP_CMD_MERGE_SNAPSHOT       3
        struct completion comp;
-       struct ploop *ploop;
-       unsigned int type;
-       int retval;
        union {
                struct {
                        sector_t new_sectors;
@@ -62,12 +60,6 @@ struct ploop_cmd {
                        unsigned int cluster, dst_cluster;
                        struct pio *pio;
                } resize;
-               struct {
-#define NR_MERGE_BIOS                  64
-                       atomic_t nr_available;
-                       unsigned int cluster; /* Currently iterated cluster */
-                       bool do_repeat;
-               } merge;
        };
 };
 
@@ -173,8 +165,11 @@ struct ploop {
        struct list_head resubmit_pios; /* After partial IO */
        struct list_head enospc_pios; /* Delayed after ENOSPC */
 
+       atomic_t service_pios;
+       struct wait_queue_head service_wq;
+
+       spinlock_t err_status_lock;
        struct rw_semaphore ctl_rwsem;
-       struct ploop_cmd *deferred_cmd;
 
        /*
         * List of locked clusters (no write is possible).
@@ -230,6 +225,7 @@ struct pio {
 
        bool is_data_alloc:1;
        bool wants_discard_index_cleanup:1;
+       bool is_fake_merge:1;
        bool free_on_endio:1;
        /*
         * 0 and 1 are related to inflight_bios_ref[],
@@ -486,6 +482,16 @@ static inline struct hlist_head *ploop_htable_slot(struct 
hlist_head head[], u32
        return &head[hash_32(clu, PLOOP_HASH_TABLE_BITS)];
 }
 
+static inline bool fake_merge_pio(struct pio *pio)
+{
+       if (pio->is_fake_merge) {
+               WARN_ON_ONCE(pio->bi_iter.bi_size ||
+                            pio->bi_op != REQ_OP_WRITE);
+               return true;
+       }
+       return false;
+}
+
 extern void md_page_insert(struct ploop *ploop, struct md_page *md);
 extern void ploop_free_md_page(struct md_page *md);
 extern void free_md_pages_tree(struct rb_root *root);
@@ -499,7 +505,6 @@ extern void defer_pios(struct ploop *ploop, struct pio 
*pio, struct list_head *p
 extern void do_ploop_work(struct work_struct *ws);
 extern void do_ploop_fsync_work(struct work_struct *ws);
 extern void ploop_event_work(struct work_struct *work);
-extern void process_deferred_cmd(struct ploop *ploop);
 extern int ploop_clone_and_map(struct dm_target *ti, struct request *rq,
                    union map_info *map_context, struct request **clone);
 extern struct pio *find_lk_of_cluster(struct ploop *ploop, u32 cluster);
@@ -514,9 +519,6 @@ extern void ploop_reset_bat_update(struct ploop_index_wb *);
 extern void ploop_submit_index_wb_sync(struct ploop *, struct ploop_index_wb 
*);
 extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
                         char *result, unsigned int maxlen);
-extern int submit_cluster_cow(struct ploop *ploop, unsigned int level,
-                             unsigned int cluster, unsigned int dst_cluster,
-                             void (*end_fn)(struct ploop *, int, void *), void 
*data);
 
 extern struct pio * alloc_pio_with_pages(struct ploop *ploop);
 extern void free_pio_with_pages(struct ploop *ploop, struct pio *pio);


_______________________________________________
Devel mailing list
[email protected]
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to