Signed-off-by: Kirill Tkhai <ktk...@virtuozzo.com>
---
 drivers/md/Kconfig           |    8 
 drivers/md/Makefile          |    5 
 drivers/md/dm-ploop-bat.c    |  597 +++++++++++++
 drivers/md/dm-ploop-cmd.c    | 1341 +++++++++++++++++++++++++++++
 drivers/md/dm-ploop-map.c    | 1963 ++++++++++++++++++++++++++++++++++++++++++
 drivers/md/dm-ploop-target.c |  573 ++++++++++++
 drivers/md/dm-ploop.h        |  581 ++++++++++++
 mm/vmscan.c                  |    2 
 8 files changed, 5069 insertions(+), 1 deletion(-)
 create mode 100644 drivers/md/dm-ploop-bat.c
 create mode 100644 drivers/md/dm-ploop-cmd.c
 create mode 100644 drivers/md/dm-ploop-map.c
 create mode 100644 drivers/md/dm-ploop-target.c
 create mode 100644 drivers/md/dm-ploop.h

diff --git a/drivers/md/Kconfig b/drivers/md/Kconfig
index 0602e82a9516..2995a50c3d4f 100644
--- a/drivers/md/Kconfig
+++ b/drivers/md/Kconfig
@@ -640,4 +640,12 @@ config DM_ZONED
 
          If unsure, say N.
 
+config DM_PLOOP
+       tristate "Ploop target support"
+       depends on BLK_DEV_DM
+       help
+         This is ploop1 format interpreter on device-mapper rails.
+
+
+
 endif # MD
diff --git a/drivers/md/Makefile b/drivers/md/Makefile
index a74aaf8b1445..af19cd3e8991 100644
--- a/drivers/md/Makefile
+++ b/drivers/md/Makefile
@@ -33,6 +33,9 @@ linear-y      += md-linear.o
 multipath-y    += md-multipath.o
 faulty-y       += md-faulty.o
 
+ploop-y                += dm-ploop-target.o dm-ploop-map.o dm-ploop-cmd.o \
+                  dm-ploop-bat.o
+
 # Note: link order is important.  All raid personalities
 # and must come before md.o, as they each initialise 
 # themselves, and md.o may use the personalities when it 
@@ -103,3 +106,5 @@ endif
 ifeq ($(CONFIG_DM_VERITY_VERIFY_ROOTHASH_SIG),y)
 dm-verity-objs                 += dm-verity-verify-sig.o
 endif
+
+obj-$(CONFIG_DM_PLOOP)         += ploop.o
diff --git a/drivers/md/dm-ploop-bat.c b/drivers/md/dm-ploop-bat.c
new file mode 100644
index 000000000000..44cbce6f6b40
--- /dev/null
+++ b/drivers/md/dm-ploop-bat.c
@@ -0,0 +1,597 @@
+/*
+ *  drivers/md/dm-ploop-bat.c
+ *
+ *  Copyright (c) 2020-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include <linux/init.h>
+#include <linux/file.h>
+#include <linux/uio.h>
+#include <linux/mm.h>
+#include "dm-ploop.h"
+
+struct md_page * md_page_find(struct ploop *ploop, u32 id)
+{
+       struct rb_node *node;
+       struct md_page *md;
+
+       node = ploop->bat_entries.rb_node;
+
+       while (node) {
+               md = rb_entry(node, struct md_page, node);
+               if (id < md->id)
+                       node = node->rb_left;
+               else if (id > md->id)
+                       node = node->rb_right;
+               else
+                       return md;
+       }
+
+       return NULL;
+}
+
+static void __md_page_insert(struct rb_root *root, struct md_page *new_md)
+{
+       struct rb_node *parent, **node;
+       u32 new_id = new_md->id;
+       struct md_page *md;
+
+       node = &root->rb_node;
+       parent = NULL;
+
+       while (*node) {
+               parent = *node;
+               md = rb_entry(*node, struct md_page, node);
+               if (new_id < md->id)
+                       node = &parent->rb_left;
+               else if (new_id > md->id)
+                       node = &parent->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new_md->node, parent, node);
+       rb_insert_color(&new_md->node, root);
+}
+
+void md_page_insert(struct ploop *ploop, struct md_page *new_md)
+{
+       __md_page_insert(&ploop->bat_entries, new_md);
+}
+
+static struct md_page * alloc_md_page(u32 id)
+{
+       struct md_page *md;
+       struct page *page;
+       unsigned int size;
+       u8 *levels;
+
+       md = kmalloc(sizeof(*md), GFP_KERNEL); /* FIXME: memcache */
+       if (!md)
+               return NULL;
+       size = sizeof(u8) * PAGE_SIZE / sizeof(map_index_t);
+       levels = kzalloc(size, GFP_KERNEL);
+       if (!levels)
+               goto err_levels;
+
+       page = alloc_page(GFP_KERNEL);
+       if (!page)
+               goto err_page;
+       INIT_LIST_HEAD(&md->wait_list);
+       INIT_LIST_HEAD(&md->wb_link);
+
+       md->status = 0;
+       md->bat_levels = levels;
+       md->piwb = NULL;
+       md->page = page;
+       md->id = id;
+       return md;
+err_page:
+       kfree(levels);
+err_levels:
+       kfree(md);
+       return NULL;
+}
+
+void ploop_free_md_page(struct md_page *md)
+{
+       put_page(md->page);
+       kfree(md->bat_levels);
+       kfree(md);
+}
+
+int prealloc_md_pages(struct rb_root *root, u32 nr_bat_entries,
+                     u32 new_nr_bat_entries)
+{
+       u32 i, nr_pages, new_nr_pages;
+       struct md_page *md;
+       void *addr;
+
+       new_nr_pages = bat_clu_to_page_nr(new_nr_bat_entries - 1) + 1;
+       nr_pages = 0;
+       if (nr_bat_entries)
+               nr_pages = bat_clu_to_page_nr(nr_bat_entries - 1) + 1;
+
+       for (i = nr_pages; i < new_nr_pages; i++) {
+               md = alloc_md_page(i);
+               if (!md)
+                       return -ENOMEM;
+               addr = kmap_atomic(md->page);
+               memset32(addr, BAT_ENTRY_NONE, PAGE_SIZE / 4);
+               kunmap_atomic(addr);
+
+               __md_page_insert(root, md);
+       }
+
+       return 0;
+}
+
+bool try_update_bat_entry(struct ploop *ploop, u32 clu, u8 level, u32 dst_clu)
+{
+       u32 *bat_entries, id = bat_clu_to_page_nr(clu);
+       struct md_page *md = md_page_find(ploop, id);
+
+       lockdep_assert_held(&ploop->bat_rwlock);
+
+       if (!md)
+               return false;
+
+       clu = bat_clu_idx_in_page(clu); /* relative offset */
+
+       if (md->bat_levels[clu] == level) {
+               bat_entries = kmap_atomic(md->page);
+               bat_entries[clu] = dst_clu;
+               kunmap_atomic(bat_entries);
+               return true;
+       }
+       return false;
+}
+
+#if 0
+/*
+ * Clear all clusters, which are referred to in BAT, from holes_bitmap.
+ * Set bat_levels[] to top delta's level. Mark unmapped clusters as
+ * BAT_ENTRY_NONE.
+ */
+static int parse_bat_entries(struct ploop *ploop, map_index_t *bat_entries,
+                            u8 *bat_levels, unsigned int nr,
+                            unsigned int page_id, u8 nr_deltas)
+{
+       int i = 0;
+
+       if (page_id == 0)
+               i = PLOOP_MAP_OFFSET;
+
+       for (; i < nr; i++) {
+               if (bat_entries[i] == BAT_ENTRY_NONE)
+                       return -EINVAL;
+               if (bat_entries[i]) {
+                       bat_levels[i] = nr_deltas - 1; /* See top_level() */
+                       /* Cluster may refer out holes_bitmap after shrinking */
+                       if (bat_entries[i] < ploop->hb_nr)
+                               ploop_hole_clear_bit(bat_entries[i], ploop);
+               } else {
+                       bat_entries[i] = BAT_ENTRY_NONE;
+               }
+       }
+
+       return 0;
+}
+
+/*
+ * Read from disk and fill bat_entries. Note, that on enter here, clu #0
+ * is already read from disk (with header) -- just parse bio pages content.
+ */
+int ploop_read_bat(struct ploop *ploop, struct bio *bio, u8 nr_deltas)
+{
+       unsigned int id, entries_per_page, nr_copy, nr_all, page, i = 0;
+       map_index_t *from, *to, clu = 0;
+       struct md_page *md;
+       int ret = 0;
+
+       entries_per_page = PAGE_SIZE / sizeof(map_index_t);
+       nr_all = ploop->nr_bat_entries + PLOOP_MAP_OFFSET;
+
+       do {
+               for (page = 0; page < nr_pages_in_cluster(ploop); page++) {
+                       id = i * sizeof(map_index_t) / PAGE_SIZE;
+                       md = md_page_find(ploop, id);
+                       if (WARN_ON_ONCE(!md)) {
+                               ret = -ENOENT;
+                               goto out;
+                       }
+
+                       nr_copy = entries_per_page;
+                       if (i + nr_copy > nr_all)
+                               nr_copy = nr_all - i;
+
+                       to = kmap(md->page);
+                       from = kmap(bio->bi_io_vec[page].bv_page);
+                       memcpy(to, from, nr_copy * sizeof(map_index_t));
+                       kunmap(bio->bi_io_vec[page].bv_page);
+                       if (unlikely(nr_copy < BAT_ENTRIES_PER_PAGE)) {
+                               memset(to + nr_copy, 0, sizeof(map_index_t) *
+                                      (BAT_ENTRIES_PER_PAGE - nr_copy));
+                       }
+
+                       ret = parse_bat_entries(ploop, to, md->bat_levels,
+                                               nr_copy, id, nr_deltas);
+                       kunmap(md->page);
+                       if (ret)
+                               goto out;
+
+                       i += nr_copy;
+                       if (i >= nr_all)
+                               goto out;
+               }
+
+               ret = ploop_read_cluster_sync(ploop, bio, ++clu);
+               if (ret)
+                       goto out;
+
+       } while (1);
+
+out:
+       return ret;
+}
+#endif
+
+/* Alloc holes_bitmap and set bits of free clusters */
+static int ploop_setup_holes_bitmap(struct ploop *ploop, u32 bat_clusters)
+{
+       u32 i, size;
+
+       /*
+        * + number of data clusters.
+        * Note, that after shrink of large disk, ploop->bat_entries[x] may
+        * refer outward of [0, ploop->hb_nr-1], and we never allocate
+        * holes_bitmap for such the clusters. Just remember to skip these
+        * clusters after discard frees them.
+        */
+       ploop->hb_nr = bat_clusters + ploop->nr_bat_entries;
+       size = round_up(DIV_ROUND_UP(ploop->hb_nr, 8), sizeof(unsigned long));
+
+       /* holes_bitmap numbers is relative to start of file */
+       ploop->holes_bitmap = kvmalloc(size, GFP_KERNEL);
+       if (!ploop->holes_bitmap)
+               return -ENOMEM;
+       memset(ploop->holes_bitmap, 0xff, size);
+
+       /* Mark all BAT clusters as occupied. */
+       for (i = 0; i < bat_clusters; i++)
+               ploop_hole_clear_bit(i, ploop);
+
+       return 0;
+}
+
+int ploop_setup_metadata(struct ploop *ploop, struct page *page)
+{
+       struct ploop_pvd_header *m_hdr = NULL;
+       u32 bat_clusters, offset_clusters;
+       struct dm_target *ti = ploop->ti;
+       unsigned long size;
+       int ret;
+
+       m_hdr = kmap(page);
+
+       ret = -ENOTSUPP;
+       if (strncmp(m_hdr->m_Sig, "WithouFreSpacExt", 16))
+               goto out;
+
+       ret = -ENOLCK;
+       if (m_hdr->m_DiskInUse != cpu_to_le32(SIGNATURE_DISK_IN_USE) &&
+           !ploop_is_ro(ploop) && !ignore_signature_disk_in_use)
+               goto out;
+
+       ret = -EINVAL;
+       if (le32_to_cpu(m_hdr->m_Sectors) != CLU_TO_SEC(ploop, 1))
+               goto out;
+
+       memcpy(ploop->m_Sig, m_hdr->m_Sig, sizeof(ploop->m_Sig));
+       ploop->m_Type = le32_to_cpu(m_hdr->m_Type);
+       ploop->m_Sectors = le32_to_cpu(m_hdr->m_Sectors);
+       ploop->nr_bat_entries = le32_to_cpu(m_hdr->m_Size);
+
+       /* Header and BAT-occupied clusters at start of file */
+       size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
+       bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
+
+       /* Clusters from start of file to first data block */
+       offset_clusters = SEC_TO_CLU(ploop, 
le32_to_cpu(m_hdr->m_FirstBlockOffset));
+       if (bat_clusters != offset_clusters) {
+               pr_err("ploop: custom FirstBlockOffset\n");
+               goto out;
+       }
+       ret = -EBADSLT;
+       if (le64_to_cpu(m_hdr->m_SizeInSectors_v2) < ti->len) {
+               pr_err("ploop: Too short BAT\n");
+               goto out;
+       }
+       kunmap(page);
+       m_hdr = NULL;
+
+       ret = ploop_setup_holes_bitmap(ploop, bat_clusters);
+out:
+       if (m_hdr)
+               kunmap(page);
+       return ret;
+}
+
+static int ploop_delta_check_header(struct ploop *ploop,
+                                   struct rb_root *md_root,
+                                   u32 *delta_nr_be_ret)
+{
+       u32 bytes, delta_nr_be, offset_clusters, bat_clusters;
+       struct md_page *md0 = md_first_entry(md_root);
+       struct ploop_pvd_header *d_hdr;
+       int ret = -EPROTO;
+
+       WARN_ON_ONCE(md0->id != 0);
+
+       d_hdr = kmap(md0->page);
+       if (memcmp(d_hdr->m_Sig, ploop->m_Sig, sizeof(d_hdr->m_Sig)) ||
+           d_hdr->m_Sectors != ploop->m_Sectors ||
+           d_hdr->m_Type != ploop->m_Type)
+               goto out;
+
+       delta_nr_be = le32_to_cpu(d_hdr->m_Size);
+       offset_clusters = SEC_TO_CLU(ploop, 
le32_to_cpu(d_hdr->m_FirstBlockOffset));
+       bytes = (PLOOP_MAP_OFFSET + delta_nr_be) * sizeof(map_index_t);
+       bat_clusters = DIV_ROUND_UP(bytes, CLU_SIZE(ploop));
+
+       if (delta_nr_be > ploop->nr_bat_entries ||
+           bat_clusters != offset_clusters)
+               goto out;
+
+       *delta_nr_be_ret = delta_nr_be;
+       ret = 0;
+out:
+       kunmap(md0->page);
+       return ret;
+}
+
+static int convert_bat_entries(struct ploop *ploop, struct rb_root *md_root,
+                              u32 nr_be, u32 nr_pages, loff_t file_size)
+{
+       u32 i, end, bytes, bat_clusters, page_id, *bat_entries, max_file_clu;
+       struct rb_node *node;
+       struct md_page *md;
+       int ret = 0;
+
+       bytes = (PLOOP_MAP_OFFSET + nr_be) * sizeof(map_index_t);
+       bat_clusters = DIV_ROUND_UP(bytes, CLU_SIZE(ploop));
+       max_file_clu = file_size / CLU_SIZE(ploop) - 1;
+
+       page_id = 0;
+       rb_root_for_each_md_page(md_root, md, node) {
+               bat_entries = kmap(md->page);
+               init_be_iter(nr_be, md->id, &i, &end);
+               WARN_ON_ONCE(page_id != md->id);
+               page_id++;
+
+               for (; i <= end; i++) {
+                       if (bat_entries[i] > max_file_clu)
+                               ret = -EPROTO;
+                       if (!bat_entries[i])
+                               bat_entries[i] = BAT_ENTRY_NONE;
+                       if (bat_entries[i] < bat_clusters)
+                               ret = -EXDEV;
+               }
+               kunmap(md->page);
+
+               if (ret || page_id == nr_pages)
+                       break;
+       }
+
+       return ret;
+}
+
+int ploop_read_delta_metadata(struct ploop *ploop, struct file *file,
+                             struct rb_root *md_root, u32 *delta_nr_be_ret)
+{
+       struct bio_vec bvec_on_stack, *bvec = &bvec_on_stack;
+       u32 i, size, delta_nr_be, nr_segs;
+       loff_t pos, file_size;
+       struct iov_iter iter;
+       struct rb_node *node;
+       struct md_page *md;
+       ssize_t len;
+       int ret;
+
+       ret = -ENOMEM;
+       if (prealloc_md_pages(md_root, 0, 1))
+               goto out;
+       bvec[0].bv_page = md_first_entry(md_root)->page;
+       bvec[0].bv_len = PAGE_SIZE;
+       bvec[0].bv_offset = 0;
+
+       iov_iter_bvec(&iter, READ, bvec, 1, PAGE_SIZE);
+       pos = 0;
+
+       len = vfs_iter_read(file, &iter, &pos, 0);
+       if (len != PAGE_SIZE) {
+               ret = len < 0 ? (int)len : -ENODATA;
+               goto out;
+       }
+
+       ret = ploop_delta_check_header(ploop, md_root, &delta_nr_be);
+       if (ret)
+               goto out;
+
+       size = (PLOOP_MAP_OFFSET + delta_nr_be) * sizeof(map_index_t);
+       size = ALIGN(size, PAGE_SIZE); /* file may be open as direct */
+       nr_segs = size / PAGE_SIZE;
+
+       ret = -ENOMEM;
+       if (prealloc_md_pages(md_root, 1, delta_nr_be))
+               goto out;
+
+       bvec = kvmalloc(sizeof(*bvec) * nr_segs, GFP_KERNEL);
+       if (!bvec)
+               goto out;
+
+       ret = -EMLINK;
+       i = 0;
+       rb_root_for_each_md_page(md_root, md, node) {
+               if (WARN_ON_ONCE(md->id != i))
+                       goto out;
+               bvec[i].bv_page = md->page;
+               bvec[i].bv_len = PAGE_SIZE;
+               bvec[i].bv_offset = 0;
+               i++;
+       }
+
+       iov_iter_bvec(&iter, READ, bvec, nr_segs, size);
+       pos = 0;
+
+       len = vfs_iter_read(file, &iter, &pos, 0);
+       if (len != size) {
+               ret = len < 0 ? (int)len : -ENODATA;
+               goto out;
+       }
+
+       file_size = i_size_read(file->f_mapping->host);
+
+       ret = convert_bat_entries(ploop, md_root, delta_nr_be, nr_segs, 
file_size);
+
+       *delta_nr_be_ret = delta_nr_be;
+out:
+       if (ret)
+               free_md_pages_tree(md_root);
+       if (bvec != &bvec_on_stack)
+               kvfree(bvec);
+       return ret;
+}
+
+static void ploop_set_not_hole(struct ploop *ploop, u32 dst_clu)
+{
+       /* Cluster may refer out holes_bitmap after shrinking */
+       if (dst_clu < ploop->hb_nr)
+               ploop_hole_clear_bit(dst_clu, ploop);
+}
+
+/*
+ * Prefer first added delta, since the order is:
+ * 1)add top device
+ * 2)add newest delta
+ * ...
+ * n)add oldest delta
+ */
+static void apply_delta_mappings(struct ploop *ploop, struct ploop_delta 
*deltas,
+                                u32 level, struct rb_root *md_root, u64 
size_in_clus)
+{
+       map_index_t *bat_entries, *d_bat_entries = NULL;
+       bool is_top_level, is_raw, stop = false;
+       struct md_page *md, *d_md = NULL;
+       u32 i, end, dst_clu, clu;
+       struct rb_node *node;
+
+       is_raw = deltas[level].is_raw;
+       is_top_level = (level == top_level(ploop));
+
+       if (!is_raw)
+               d_md = md_first_entry(md_root);
+
+       write_lock_irq(&ploop->bat_rwlock);
+       ploop_for_each_md_page(ploop, md, node) {
+               bat_entries = kmap_atomic(md->page);
+               if (!is_raw)
+                       d_bat_entries = kmap_atomic(d_md->page);
+
+               if (is_top_level && md->id == 0 && !is_raw) {
+                       /* bat_entries before PLOOP_MAP_OFFSET is hdr */
+                       memcpy(bat_entries, d_bat_entries,
+                              sizeof(struct ploop_pvd_header));
+               }
+
+               init_be_iter(size_in_clus, md->id, &i, &end);
+
+               for (; i <= end; i++) {
+                       clu = page_clu_idx_to_bat_clu(md->id, i);
+                       if (clu == size_in_clus - 1)
+                               stop = true;
+
+                       if (bat_entries[i] != BAT_ENTRY_NONE) {
+                               /* md0 is already populated */
+                               WARN_ON_ONCE(md->id && is_top_level);
+                               goto set_not_hole;
+                       }
+
+                       if (!is_raw)
+                               dst_clu = d_bat_entries[i];
+                       else
+                               dst_clu = clu;
+
+                       if (dst_clu == BAT_ENTRY_NONE)
+                               continue;
+                       md->bat_levels[i] = level;
+                       bat_entries[i] = dst_clu;
+set_not_hole:
+                       if (is_top_level)
+                               ploop_set_not_hole(ploop, bat_entries[i]);
+               }
+
+               kunmap_atomic(bat_entries);
+               if (!is_raw)
+                       kunmap_atomic(d_bat_entries);
+               if (stop)
+                       break;
+               if (!is_raw)
+                       d_md = md_next_entry(d_md);
+       }
+       write_unlock_irq(&ploop->bat_rwlock);
+}
+
+int ploop_check_delta_length(struct ploop *ploop, struct file *file, loff_t 
*file_size)
+{
+       loff_t loff = i_size_read(file->f_mapping->host);
+
+       if (loff & (CLU_SIZE(ploop) - 1))
+               return -EPROTO;
+       *file_size = loff;
+       return 0;
+}
+
+/*
+ * @fd refers to a new delta, which is placed right before top_delta.
+ * So, userspace has to populate deltas stack from oldest to newest.
+ */
+int ploop_add_delta(struct ploop *ploop, u32 level, struct file *file, bool 
is_raw)
+{
+       struct ploop_delta *deltas = ploop->deltas;
+       struct rb_root md_root = RB_ROOT;
+       loff_t file_size;
+       u32 size_in_clus;
+       int ret;
+
+       ret = ploop_check_delta_length(ploop, file, &file_size);
+       if (ret)
+               goto out;
+
+       if (!is_raw) {
+               ret = ploop_read_delta_metadata(ploop, file, &md_root,
+                                               &size_in_clus);
+               if (ret)
+                       goto out;
+       } else {
+               size_in_clus = POS_TO_CLU(ploop, file_size);
+       }
+
+       ret = -EBADSLT;
+       if (level != top_level(ploop) &&
+           size_in_clus > deltas[level + 1].size_in_clus)
+               goto out;
+
+       apply_delta_mappings(ploop, deltas, level, &md_root, size_in_clus);
+
+       deltas[level].file = file;
+       deltas[level].file_size = file_size;
+       deltas[level].file_preallocated_area_start = file_size;
+       deltas[level].size_in_clus = size_in_clus;
+       deltas[level].is_raw = is_raw;
+       ret = 0;
+out:
+       free_md_pages_tree(&md_root);
+       return ret;
+}
diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
new file mode 100644
index 000000000000..3ba866cb0ec0
--- /dev/null
+++ b/drivers/md/dm-ploop-cmd.c
@@ -0,0 +1,1341 @@
+/*
+ *  drivers/md/dm-ploop-cmd.c
+ *
+ *  Copyright (c) 2020-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include <linux/init.h>
+#include <linux/file.h>
+#include <linux/uio.h>
+#include <linux/ctype.h>
+#include <linux/umh.h>
+#include <linux/sched/signal.h>
+#include "dm-ploop.h"
+
+#define DM_MSG_PREFIX "ploop"
+#define PLOOP_DEBUG
+
+/*
+ * Assign newly allocated memory for BAT array and holes_bitmap
+ * before grow.
+ */
+static void ploop_advance_holes_bitmap(struct ploop *ploop,
+                                      struct ploop_cmd *cmd)
+{
+       u32 i, end, size, dst_clu, *bat_entries;
+       struct rb_node *node;
+       struct md_page *md;
+
+       /* This is called only once */
+       if (cmd->resize.stage != PLOOP_GROW_STAGE_INITIAL)
+               return;
+       cmd->resize.stage++;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       /* Copy and swap holes_bitmap */
+       size = DIV_ROUND_UP(ploop->hb_nr, 8);
+       memcpy(cmd->resize.holes_bitmap, ploop->holes_bitmap, size);
+       swap(cmd->resize.holes_bitmap, ploop->holes_bitmap);
+       for (i = ploop->hb_nr; i < size * 8; i++)
+               set_bit(i, ploop->holes_bitmap);
+       swap(cmd->resize.hb_nr, ploop->hb_nr);
+       ploop_for_each_md_page(ploop, md, node) {
+               ploop_init_be_iter(ploop, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               for (; i <= end; i++) {
+                       if (!md_page_cluster_is_in_top_delta(ploop, md, i))
+                               continue;
+                       dst_clu = bat_entries[i];
+                       /* This may happen after grow->shrink->(now) grow */
+                       if (dst_clu < ploop->hb_nr &&
+                           test_bit(dst_clu, ploop->holes_bitmap)) {
+                               ploop_hole_clear_bit(dst_clu, ploop);
+                       }
+               }
+               kunmap_atomic(bat_entries);
+       }
+       write_unlock_irq(&ploop->bat_rwlock);
+}
+
+static int wait_for_completion_maybe_killable(struct completion *comp,
+                                             bool killable)
+{
+       int ret = 0;
+
+       if (killable) {
+               ret = wait_for_completion_killable_timeout(comp, 
PLOOP_INFLIGHT_TIMEOUT);
+               if (!ret)
+                       ret = -ETIMEDOUT;
+               else if (ret > 0)
+                       ret = 0;
+       } else {
+               wait_for_completion(comp);
+       }
+
+       return ret;
+}
+
+/*
+ * Switch index of ploop->inflight_bios_ref[] and wait till inflight
+ * bios are completed. This waits for completion of simple submitted
+ * action like write to origin_dev or read from delta, but it never
+ * guarantees completion of complex actions like "data write + index
+ * writeback" (for index protection look at clu locks). This is
+ * weaker, than "dmsetup suspend".
+ * It is called from kwork only, so this can't be executed in parallel.
+ */
+static int ploop_inflight_bios_ref_switch(struct ploop *ploop, bool killable)
+{
+       struct completion *comp = &ploop->inflight_bios_ref_comp;
+       unsigned int index = ploop->inflight_bios_ref_index;
+       int ret;
+
+       WARN_ON_ONCE(current->flags & PF_WQ_WORKER);
+
+       if (ploop->inflight_ref_comp_pending) {
+               /* Previous completion was interrupted */
+               ret = wait_for_completion_maybe_killable(comp, killable);
+               if (ret)
+                       return ret;
+               ploop->inflight_ref_comp_pending = false;
+               percpu_ref_reinit(&ploop->inflight_bios_ref[!index]);
+       }
+
+       init_completion(comp);
+
+       spin_lock_irq(&ploop->deferred_lock);
+       ploop->inflight_bios_ref_index = !index;
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       percpu_ref_kill(&ploop->inflight_bios_ref[index]);
+
+       ret = wait_for_completion_maybe_killable(comp, killable);
+       if (ret) {
+               ploop->inflight_ref_comp_pending = true;
+               return ret;
+       }
+
+       percpu_ref_reinit(&ploop->inflight_bios_ref[index]);
+       return 0;
+}
+
+static void ploop_resume_submitting_pios(struct ploop *ploop)
+{
+       LIST_HEAD(list);
+
+       spin_lock_irq(&ploop->deferred_lock);
+       WARN_ON_ONCE(!ploop->stop_submitting_pios);
+       ploop->stop_submitting_pios = false;
+       list_splice_tail_init(&ploop->suspended_pios, &list);
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       submit_embedded_pios(ploop, &list);
+}
+
+static int ploop_suspend_submitting_pios(struct ploop *ploop)
+{
+       int ret;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       WARN_ON_ONCE(ploop->stop_submitting_pios);
+       ploop->stop_submitting_pios = true;
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       ret = ploop_inflight_bios_ref_switch(ploop, true);
+       if (ret)
+               ploop_resume_submitting_pios(ploop);
+       return ret;
+}
+
+/* Find existing BAT clu pointing to dst_clu */
+static u32 ploop_find_bat_entry(struct ploop *ploop, u32 dst_clu, bool 
*is_locked)
+{
+       u32 i, end, *bat_entries, clu = U32_MAX;
+       struct rb_node *node;
+       struct md_page *md;
+
+       read_lock_irq(&ploop->bat_rwlock);
+       ploop_for_each_md_page(ploop, md, node) {
+               ploop_init_be_iter(ploop, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               for (; i <= end; i++) {
+                       if (bat_entries[i] != dst_clu)
+                               continue;
+                       if (md_page_cluster_is_in_top_delta(ploop, md, i)) {
+                               clu = page_clu_idx_to_bat_clu(md->id, i);
+                               break;
+                       }
+               }
+               kunmap_atomic(bat_entries);
+               if (clu != UINT_MAX)
+                       break;
+       }
+       read_unlock_irq(&ploop->bat_rwlock);
+
+       *is_locked = false;
+       if (clu != UINT_MAX) {
+               spin_lock_irq(&ploop->deferred_lock);
+               *is_locked = find_lk_of_cluster(ploop, clu);
+               spin_unlock_irq(&ploop->deferred_lock);
+       }
+
+       return clu;
+}
+
+void pio_prepare_offsets(struct ploop *ploop, struct pio *pio, u32 clu)
+{
+       int i, nr_pages = nr_pages_in_cluster(ploop);
+
+       pio->bi_iter.bi_idx = 0;
+       pio->bi_iter.bi_bvec_done = 0;
+       pio->bi_vcnt = nr_pages;
+
+       for (i = 0; i < nr_pages; i++) {
+               pio->bi_io_vec[i].bv_offset = 0;
+               pio->bi_io_vec[i].bv_len = PAGE_SIZE;
+       }
+       pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu);
+       pio->bi_iter.bi_size = CLU_SIZE(ploop);
+}
+
+static void wake_completion(struct pio *pio, void *data, blk_status_t status)
+{
+       struct completion *completion = data;
+
+       complete(completion);
+}
+
+static int ploop_read_cluster_sync(struct ploop *ploop, struct pio *pio,
+                                  u32 dst_clu)
+{
+       DECLARE_COMPLETION_ONSTACK(completion);
+
+       init_pio(ploop, REQ_OP_READ, pio);
+       pio_prepare_offsets(ploop, pio, dst_clu);
+
+       pio->endio_cb = wake_completion;
+       pio->endio_cb_data = &completion;
+
+       map_and_submit_rw(ploop, dst_clu, pio, top_level(ploop));
+       wait_for_completion(&completion);
+
+       if (pio->bi_status)
+               return blk_status_to_errno(pio->bi_status);
+
+       return 0;
+}
+
+static int ploop_write_cluster_sync(struct ploop *ploop, struct pio *pio,
+                                   u32 dst_clu)
+{
+       struct file *file = top_delta(ploop)->file;
+       DECLARE_COMPLETION_ONSTACK(completion);
+       int ret;
+
+       ret = vfs_fsync(file, 0);
+       if (ret)
+               return ret;
+
+       init_pio(ploop, REQ_OP_WRITE, pio);
+       pio_prepare_offsets(ploop, pio, dst_clu);
+
+       pio->endio_cb = wake_completion;
+       pio->endio_cb_data = &completion;
+
+       map_and_submit_rw(ploop, dst_clu, pio, top_level(ploop));
+       wait_for_completion(&completion);
+
+       if (pio->bi_status)
+               return blk_status_to_errno(pio->bi_status);
+
+       /* track_bio(ploop, bio); */
+       return vfs_fsync(file, 0);
+}
+
+static int ploop_write_zero_cluster_sync(struct ploop *ploop,
+                                        struct pio *pio, u32 clu)
+{
+       void *data;
+       int i;
+
+       for (i = 0; i < pio->bi_vcnt; i++) {
+               data = kmap_atomic(pio->bi_io_vec[i].bv_page);
+               memset(data, 0, PAGE_SIZE);
+               kunmap_atomic(data);
+       }
+
+       return ploop_write_cluster_sync(ploop, pio, clu);
+}
+
+static void ploop_make_md_wb(struct ploop *ploop, struct md_page *md)
+{
+       write_lock_irq(&ploop->bat_rwlock);
+       md->status |= MD_WRITEBACK;
+       write_unlock_irq(&ploop->bat_rwlock);
+}
+
+static int ploop_grow_relocate_cluster(struct ploop *ploop,
+                                      struct ploop_cmd *cmd)
+{
+       struct pio *pio = cmd->resize.pio;
+       struct ploop_index_wb *piwb;
+       u32 new_dst, clu, dst_clu;
+       struct completion comp;
+       blk_status_t bi_status;
+       struct md_page *md;
+       bool is_locked;
+       int ret = 0;
+
+       dst_clu = cmd->resize.dst_clu;
+
+       /* Relocate clu and update index */
+       clu = ploop_find_bat_entry(ploop, dst_clu, &is_locked);
+       if (clu == UINT_MAX || is_locked) {
+               /* dst_clu in top delta is not occupied? */
+               if (!test_bit(dst_clu, ploop->holes_bitmap) || is_locked) {
+                       WARN_ON_ONCE(1);
+                       ret = -EIO;
+                       goto out;
+               }
+               /* Cluster is free, occupy it. Skip relocaton */
+               ploop_hole_clear_bit(dst_clu, ploop);
+               goto not_occupied;
+       }
+
+       /* Read full clu sync */
+       ret = ploop_read_cluster_sync(ploop, pio, dst_clu);
+       if (ret < 0)
+               goto out;
+
+       ret = ploop_prepare_reloc_index_wb(ploop, &md, clu, &new_dst);
+       if (ret < 0)
+               goto out;
+       piwb = md->piwb;
+
+       /* Write clu to new destination */
+       ret = ploop_write_cluster_sync(ploop, pio, new_dst);
+       if (ret) {
+               ploop_break_bat_update(ploop, md);
+               goto out;
+       }
+
+       ploop_make_md_wb(ploop, md);
+       init_completion(&comp);
+       piwb->comp = &comp;
+       piwb->comp_bi_status = &bi_status;
+       /* Write new index on disk */
+       ploop_index_wb_submit(ploop, piwb);
+       wait_for_completion(&comp);
+
+       ret = blk_status_to_errno(bi_status);
+       if (ret)
+               goto out;
+
+       /* Update local BAT copy */
+       write_lock_irq(&ploop->bat_rwlock);
+       WARN_ON(!try_update_bat_entry(ploop, clu, top_level(ploop), new_dst));
+       write_unlock_irq(&ploop->bat_rwlock);
+not_occupied:
+       /*
+        * Now dst_clu is not referenced in BAT, so increase the value
+        * for next iteration. The place we do this is significant: caller
+        * makes rollback based on this.
+        */
+       cmd->resize.dst_clu++;
+
+       /* Zero new BAT entries on disk. */
+       ret = ploop_write_zero_cluster_sync(ploop, pio, dst_clu);
+out:
+       return ret;
+}
+
+static int ploop_grow_update_header(struct ploop *ploop,
+                                   struct ploop_cmd *cmd)
+{
+       unsigned int size, first_block_off;
+       struct ploop_pvd_header *hdr;
+       struct ploop_index_wb *piwb;
+       u32 nr_be, offset, clus;
+       struct completion comp;
+       blk_status_t bi_status;
+       struct md_page *md;
+       u64 sectors;
+       int ret;
+
+       /* hdr is in the same page as bat_entries[0] index */
+       ret = ploop_prepare_reloc_index_wb(ploop, &md, 0, NULL);
+       if (ret)
+               return ret;
+       piwb = md->piwb;
+
+       size = (PLOOP_MAP_OFFSET + cmd->resize.nr_bat_entries);
+       size *= sizeof(map_index_t);
+       clus = DIV_ROUND_UP(size, CLU_SIZE(ploop));
+       first_block_off = CLU_TO_SEC(ploop, clus);
+
+       hdr = kmap_atomic(piwb->bat_page);
+       /* TODO: head and cylinders */
+       nr_be = hdr->m_Size = cpu_to_le32(cmd->resize.nr_bat_entries);
+       sectors = hdr->m_SizeInSectors_v2 = 
cpu_to_le64(cmd->resize.new_sectors);
+       offset = hdr->m_FirstBlockOffset = cpu_to_le32(first_block_off);
+       kunmap_atomic(hdr);
+
+       ploop_make_md_wb(ploop, md);
+       init_completion(&comp);
+       piwb->comp = &comp;
+       piwb->comp_bi_status = &bi_status;
+       ploop_index_wb_submit(ploop, piwb);
+       wait_for_completion(&comp);
+
+       ret = blk_status_to_errno(bi_status);
+       if (!ret) {
+               /* Now update our cached page */
+               hdr = kmap_atomic(cmd->resize.md0->page);
+               hdr->m_Size = nr_be;
+               hdr->m_SizeInSectors_v2 = sectors;
+               hdr->m_FirstBlockOffset = offset;
+               kunmap_atomic(hdr);
+       }
+
+       return ret;
+}
+
+static void ploop_add_md_pages(struct ploop *ploop, struct rb_root *from)
+{
+       struct rb_node *node;
+        struct md_page *md;
+
+        while ((node = from->rb_node) != NULL) {
+               md = rb_entry(node, struct md_page, node);
+               rb_erase(node, from);
+               md_page_insert(ploop, md);
+       }
+}
+/*
+ * Here we relocate data clusters, which may intersect with BAT area
+ * of disk after resize. For user they look as already written to disk,
+ * so be careful(!) and protective. Update indexes only after clu
+ * data is written to disk.
+ */
+static int process_resize_cmd(struct ploop *ploop, struct ploop_cmd *cmd)
+{
+       u32 dst_clu;
+       int ret = 0;
+
+       /* Update memory arrays and hb_nr, but do not update nr_bat_entries. */
+       ploop_advance_holes_bitmap(ploop, cmd);
+
+       while (cmd->resize.dst_clu <= cmd->resize.end_dst_clu) {
+               ret = ploop_grow_relocate_cluster(ploop, cmd);
+               if (ret)
+                       goto out;
+       }
+
+       /* Update header metadata */
+       ret = ploop_grow_update_header(ploop, cmd);
+out:
+       write_lock_irq(&ploop->bat_rwlock);
+       if (ret) {
+               /* Cleanup: mark new BAT overages as free clusters */
+               dst_clu = cmd->resize.dst_clu - 1;
+
+               while (dst_clu >= cmd->resize.nr_old_bat_clu) {
+                       ploop_hole_set_bit(dst_clu, ploop);
+                       dst_clu--;
+               }
+               swap(ploop->hb_nr, cmd->resize.hb_nr);
+       } else {
+               ploop_add_md_pages(ploop, &cmd->resize.md_pages_root);
+               swap(ploop->nr_bat_entries, cmd->resize.nr_bat_entries);
+       }
+       write_unlock_irq(&ploop->bat_rwlock);
+
+       return ret;
+}
+
+struct pio *alloc_pio_with_pages(struct ploop *ploop)
+{
+       int i, nr_pages = nr_pages_in_cluster(ploop);
+       struct pio *pio;
+       u32 size;
+
+       size = sizeof(*pio) + sizeof(*pio->bi_io_vec) * nr_pages;
+       pio = kmalloc(size, GFP_NOIO);
+       if (!pio)
+               return NULL;
+       pio->bi_io_vec = (void *)(pio + 1);
+
+       for (i = 0; i < nr_pages; i++) {
+               pio->bi_io_vec[i].bv_page = alloc_page(GFP_NOIO);
+               if (!pio->bi_io_vec[i].bv_page)
+                       goto err;
+               pio->bi_io_vec[i].bv_offset = 0;
+               pio->bi_io_vec[i].bv_len = PAGE_SIZE;
+       }
+
+       pio->bi_vcnt = nr_pages;
+       pio->bi_iter.bi_size = CLU_SIZE(ploop);
+
+       return pio;
+err:
+       while (i-- > 0)
+               put_page(pio->bi_io_vec[i].bv_page);
+       kfree(pio);
+       return NULL;
+}
+
+void free_pio_with_pages(struct ploop *ploop, struct pio *pio)
+{
+       int i, nr_pages = pio->bi_vcnt;
+       struct page *page;
+
+       /*
+        * Not a error for this function, but the rest of code
+        * may expect this. Sanity check.
+        */
+       WARN_ON_ONCE(nr_pages != nr_pages_in_cluster(ploop));
+
+       for (i = 0; i < nr_pages; i++) {
+               page = pio->bi_io_vec[i].bv_page;
+               put_page(page);
+       }
+
+       kfree(pio);
+}
+
+/* @new_size is in sectors */
+/* TODO: we may delegate this to userspace */
+static int ploop_resize(struct ploop *ploop, sector_t new_sectors)
+{
+       struct ploop_cmd cmd = { .resize.md_pages_root = RB_ROOT };
+       u32 nr_bat_entries, nr_old_bat_clusters, nr_bat_clusters;
+       unsigned int hb_nr, size, old_size;
+       struct ploop_pvd_header *hdr;
+       sector_t old_sectors;
+       struct md_page *md0;
+       int ret = -ENOMEM;
+
+       if (ploop->maintaince)
+               return -EBUSY;
+       if (ploop_is_ro(ploop))
+               return -EROFS;
+
+       md0 = md_page_find(ploop, 0);
+       if (WARN_ON(!md0))
+               return -EIO;
+       hdr = kmap(md0->page);
+       old_sectors = le64_to_cpu(hdr->m_SizeInSectors_v2);
+       kunmap(md0->page);
+
+       if (old_sectors == new_sectors)
+               return 0;
+       if (old_sectors > new_sectors) {
+               DMWARN("online shrink is not supported");
+               return -EINVAL;
+       } else if (SEC_TO_CLU(ploop, new_sectors) >= UINT_MAX - 2) {
+               DMWARN("resize: too large size is requested");
+               return -EINVAL;
+       } else if (new_sectors & (CLU_TO_SEC(ploop, 1) - 1)) {
+               DMWARN("resize: new_sectors is not aligned");
+               return -EINVAL;
+       }
+
+       nr_bat_entries = SEC_TO_CLU(ploop, new_sectors);
+
+       /* Memory for new md pages */
+       if (prealloc_md_pages(&cmd.resize.md_pages_root,
+                             ploop->nr_bat_entries, nr_bat_entries) < 0)
+               goto err;
+
+       size = (PLOOP_MAP_OFFSET + nr_bat_entries) * sizeof(map_index_t);
+       nr_bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
+       hb_nr = nr_bat_clusters + nr_bat_entries;
+       size = round_up(DIV_ROUND_UP(hb_nr, 8), sizeof(unsigned long));
+
+       /* Currently occupied bat clusters */
+       nr_old_bat_clusters = ploop_nr_bat_clusters(ploop,
+                                                   ploop->nr_bat_entries);
+       /* Memory for holes_bitmap */
+       cmd.resize.holes_bitmap = kvmalloc(size, GFP_KERNEL);
+       if (!cmd.resize.holes_bitmap)
+               goto err;
+
+       /* Mark all new bitmap memory as holes */
+       old_size = DIV_ROUND_UP(ploop->hb_nr, 8);
+       memset(cmd.resize.holes_bitmap + old_size, 0xff, size - old_size);
+
+       cmd.resize.pio = alloc_pio_with_pages(ploop);
+       if (!cmd.resize.pio)
+               goto err;
+
+       cmd.resize.clu = UINT_MAX;
+       cmd.resize.dst_clu = nr_old_bat_clusters;
+       cmd.resize.end_dst_clu = nr_bat_clusters - 1;
+       cmd.resize.nr_old_bat_clu = nr_old_bat_clusters;
+       cmd.resize.nr_bat_entries = nr_bat_entries;
+       cmd.resize.hb_nr = hb_nr;
+       cmd.resize.new_sectors = new_sectors;
+       cmd.resize.md0 = md0;
+
+       ploop_suspend_submitting_pios(ploop);
+       ret = process_resize_cmd(ploop, &cmd);
+       ploop_resume_submitting_pios(ploop);
+err:
+       if (cmd.resize.pio)
+               free_pio_with_pages(ploop, cmd.resize.pio);
+       kvfree(cmd.resize.holes_bitmap);
+       free_md_pages_tree(&cmd.resize.md_pages_root);
+       return ret;
+}
+static void service_pio_endio(struct pio *pio, void *data, blk_status_t status)
+{
+       struct ploop *ploop = pio->ploop;
+       blk_status_t *status_ptr = data;
+       unsigned long flags;
+
+       if (unlikely(status)) {
+               spin_lock_irqsave(&ploop->err_status_lock, flags);
+               *status_ptr = status;
+               spin_unlock_irqrestore(&ploop->err_status_lock, flags);
+       }
+
+       if (atomic_dec_return(&ploop->service_pios) < MERGE_PIOS_MAX / 2)
+               wake_up(&ploop->service_wq);
+}
+
+static int process_merge_latest_snapshot(struct ploop *ploop)
+{
+       static blk_status_t service_status;
+       struct bio_vec bvec = {0};
+       struct pio *pio;
+       int ret = 0;
+       u32 clu;
+
+       for (clu = 0; clu < ploop->nr_bat_entries; clu++) {
+               if (fatal_signal_pending(current)) {
+                       ret = -EINTR;
+                       break;
+               }
+               pio = alloc_pio(ploop, GFP_KERNEL);
+               if (!pio) {
+                       ret = -ENOMEM;
+                       break;
+               }
+               init_pio(ploop, REQ_OP_WRITE, pio);
+               pio->free_on_endio = true;
+               pio->bi_io_vec = &bvec;
+               pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu);
+               pio->bi_iter.bi_size = 0;
+               pio->bi_iter.bi_idx = 0;
+               pio->bi_iter.bi_bvec_done = 0;
+               pio->endio_cb = service_pio_endio;
+               pio->endio_cb_data = &service_status;
+               pio->is_fake_merge = true;
+               WARN_ON_ONCE(!fake_merge_pio(pio));
+
+               dispatch_pios(ploop, pio, NULL);
+
+               if (atomic_inc_return(&ploop->service_pios) == MERGE_PIOS_MAX) {
+                       wait_event(ploop->service_wq,
+                                       atomic_read(&ploop->service_pios) < 
MERGE_PIOS_MAX);
+               }
+
+               if (unlikely(READ_ONCE(service_status)))
+                       break;
+       }
+
+       wait_event(ploop->service_wq, !atomic_read(&ploop->service_pios));
+       if (!ret) {
+               spin_lock_irq(&ploop->err_status_lock);
+               ret = blk_status_to_errno(service_status);
+               spin_unlock_irq(&ploop->err_status_lock);
+       }
+
+       return ret;
+}
+
+static int ploop_merge_latest_snapshot(struct ploop *ploop)
+{
+       struct file *file;
+       u8 level;
+       int ret;
+
+       if (ploop->maintaince)
+               return -EBUSY;
+       if (ploop_is_ro(ploop))
+               return -EROFS;
+       if (ploop->nr_deltas < 2)
+               return -ENOENT;
+
+       ret = process_merge_latest_snapshot(ploop);
+       if (ret)
+               goto out;
+
+       /* Delta merged. Release delta's file */
+       ret = ploop_suspend_submitting_pios(ploop);
+       if (ret)
+               goto out;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       level = ploop->nr_deltas - 2;
+       file = ploop->deltas[level].file;
+       ploop->deltas[level] = ploop->deltas[level + 1];
+       ploop->nr_deltas--;
+       write_unlock_irq(&ploop->bat_rwlock);
+       fput(file);
+
+       ploop_resume_submitting_pios(ploop);
+out:
+       return ret;
+}
+
+static void notify_delta_merged(struct ploop *ploop, u8 level,
+                               struct rb_root *md_root,
+                               bool forward, u32 size_in_clus)
+{
+       u32 i, end, *bat_entries, *d_bat_entries;
+       struct md_page *md, *d_md;
+       struct rb_node *node;
+       struct file *file;
+       bool stop = false;
+       u32 clu;
+
+       d_md = md_first_entry(md_root);
+
+       write_lock_irq(&ploop->bat_rwlock);
+       ploop_for_each_md_page(ploop, md, node) {
+               init_be_iter(size_in_clus, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               d_bat_entries = kmap_atomic(d_md->page);
+               for (; i <= end; i++) {
+                       clu = page_clu_idx_to_bat_clu(md->id, i);
+                       if (clu == size_in_clus - 1)
+                               stop = true;
+
+                       /* deltas above @level become renumbered */
+                       if (bat_entries[i] != BAT_ENTRY_NONE &&
+                           md->bat_levels[i] > level) {
+                               md->bat_levels[i]--;
+                               continue;
+                       }
+
+                       if (bat_entries[i] != BAT_ENTRY_NONE &&
+                            md->bat_levels[i] < level)
+                               continue;
+
+                       if (d_bat_entries[i] == BAT_ENTRY_NONE) {
+                               WARN_ON_ONCE(bat_entries[i] != BAT_ENTRY_NONE);
+                               continue;
+                       }
+
+                       /*
+                        * clusters from deltas of @level become pointing to
+                        * 1)next delta (which became renumbered) or
+                        * 2)prev delta (if !@forward).
+                        */
+                       bat_entries[i] = d_bat_entries[i];
+                       if (!forward)
+                               md->bat_levels[i] = level - 1;
+                       else
+                               md->bat_levels[i] = level;
+               }
+               kunmap_atomic(bat_entries);
+               kunmap_atomic(d_bat_entries);
+               if (stop)
+                       break;
+               d_md = md_next_entry(d_md);
+       }
+
+       file = ploop->deltas[level].file;
+       /* Renumber deltas above @level */
+       for (i = level + 1; i < ploop->nr_deltas; i++)
+               ploop->deltas[i - 1] = ploop->deltas[i];
+       memset(&ploop->deltas[--ploop->nr_deltas], 0,
+              sizeof(struct ploop_delta));
+       write_unlock_irq(&ploop->bat_rwlock);
+       fput(file);
+}
+
+static int process_update_delta_index(struct ploop *ploop, u8 level,
+                                     const char *map)
+{
+       u32 clu, dst_clu, n;
+       int ret;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       /* Check all */
+       while (sscanf(map, "%u:%u;%n", &clu, &dst_clu, &n) == 2) {
+               if (clu >= ploop->nr_bat_entries)
+                       break;
+               if (ploop_bat_entries(ploop, clu, NULL, NULL) == BAT_ENTRY_NONE)
+                       break;
+               map += n;
+       }
+       if (map[0] != '\0') {
+               ret = -EINVAL;
+               goto unlock;
+       }
+       /* Commit all */
+       while (sscanf(map, "%u:%u;%n", &clu, &dst_clu, &n) == 2) {
+               try_update_bat_entry(ploop, clu, level, dst_clu);
+               map += n;
+       }
+       ret = 0;
+unlock:
+       write_unlock_irq(&ploop->bat_rwlock);
+       return ret;
+}
+
+static int ploop_delta_clusters_merged(struct ploop *ploop, u8 level,
+                                      bool forward)
+{
+       struct ploop_delta *deltas = ploop->deltas;
+       struct rb_root md_root = RB_ROOT;
+       struct file *file;
+       loff_t file_size;
+       u32 size_in_clus;
+       u8 changed_level;
+       int ret;
+
+       /* Reread BAT of deltas[@level + 1] (or [@level - 1]) */
+       changed_level = level + (forward ? 1 : -1);
+       file = deltas[changed_level].file;
+
+       ret = ploop_check_delta_length(ploop, file, &file_size);
+       if (ret)
+               goto out;
+
+       ret = ploop_read_delta_metadata(ploop, file, &md_root, &size_in_clus);
+       if (ret)
+               goto out;
+
+       ret = -EFBIG;
+       if (changed_level != top_level(ploop) &&
+           size_in_clus > deltas[changed_level + 1].size_in_clus)
+               goto out;
+
+       ret = ploop_suspend_submitting_pios(ploop);
+       if (ret)
+               goto out;
+
+       notify_delta_merged(ploop, level, &md_root, forward, size_in_clus);
+
+       deltas[changed_level].file_size = file_size;
+       deltas[changed_level].file_preallocated_area_start = file_size;
+       deltas[changed_level].size_in_clus = size_in_clus;
+
+       ploop_resume_submitting_pios(ploop);
+       ret = 0;
+out:
+       free_md_pages_tree(&md_root);
+       return ret;
+}
+
+static int ploop_notify_merged(struct ploop *ploop, u8 level, bool forward)
+{
+       if (ploop->maintaince)
+               return -EBUSY;
+       if (level >= top_level(ploop))
+               return -ENOENT;
+       if (level == 0 && !forward)
+               return -EINVAL;
+       if (level == 0 && ploop->deltas[0].is_raw)
+               return -ENOTSUPP;
+       if (level == top_level(ploop) - 1 && forward)
+               return -EINVAL;
+       if (ploop->nr_deltas < 3)
+               return -EINVAL;
+       /*
+        * Userspace notifies us, it has copied clusters of
+        * ploop->deltas[@level] to ploop->deltas[@level + 1]
+        * (deltas[@level] to deltas[@level - 1] if !@forward).
+        * Now we want to update our bat_entries/levels arrays,
+        * where ploop->deltas[@level] is used currently, to use
+        * @level + 1 instead. Also we want to put @level's file,
+        * and renumerate deltas.
+        */
+       return ploop_delta_clusters_merged(ploop, level, forward);
+}
+
+static int ploop_get_delta_name_cmd(struct ploop *ploop, u8 level,
+                               char *result, unsigned int maxlen)
+{
+       struct file *file;
+       int len, ret = 1;
+       char *p;
+
+       if (level >= ploop->nr_deltas) {
+               result[0] = '\0';
+               goto out;
+       }
+
+       /*
+        * Nobody can change deltas in parallel, since
+        * another cmds are prohibited, but do this
+        * for uniformity.
+        */
+       read_lock_irq(&ploop->bat_rwlock);
+       file = get_file(ploop->deltas[level].file);
+       read_unlock_irq(&ploop->bat_rwlock);
+
+       p = file_path(file, result, maxlen);
+       if (p == ERR_PTR(-ENAMETOOLONG)) {
+               /* Notify target_message(), there is not enough space */
+               memset(result, 'x', maxlen - 1);
+               result[maxlen - 1] = 0;
+       } else if (IS_ERR_OR_NULL(p)) {
+               ret = PTR_ERR(p);
+       } else {
+               len = strlen(p);
+               memmove(result, p, len);
+               result[len] = '\n';
+               result[len + 1] = '\0';
+       }
+
+       fput(file);
+out:
+       return ret;
+}
+
+static int ploop_update_delta_index(struct ploop *ploop, unsigned int level,
+                                   const char *map)
+{
+       struct ploop_delta *delta;
+       loff_t file_size;
+       int ret;
+
+       if (ploop->maintaince)
+               return -EBUSY;
+       if (level >= top_level(ploop))
+               return -ENOENT;
+
+       ret = ploop_suspend_submitting_pios(ploop);
+       if (ret)
+               goto out;
+
+       delta = &ploop->deltas[level];
+       /* Index update may go together with file size increase */
+       ret = ploop_check_delta_length(ploop, delta->file, &file_size);
+       if (ret)
+               goto resume;
+       delta->file_size = file_size;
+       delta->file_preallocated_area_start = file_size;
+
+       ret = process_update_delta_index(ploop, level, map);
+
+resume:
+       ploop_resume_submitting_pios(ploop);
+out:
+       return ret;
+}
+
+static int process_flip_upper_deltas(struct ploop *ploop)
+{
+       u32 i, end, bat_clusters, hb_nr, *bat_entries;
+       void *holes_bitmap = ploop->holes_bitmap;
+       u8 level = top_level(ploop) - 1;
+       struct rb_node *node;
+       struct md_page *md;
+       u64 size;
+
+       size = (PLOOP_MAP_OFFSET + ploop->nr_bat_entries) * sizeof(map_index_t);
+        bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
+       hb_nr = ploop->hb_nr;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       /* Prepare holes_bitmap */
+       memset(holes_bitmap, 0xff, hb_nr/8);
+       for (i = (hb_nr & ~0x7); i < hb_nr; i++)
+               set_bit(i, holes_bitmap);
+       for (i = 0; i < bat_clusters; i++)
+               clear_bit(i, holes_bitmap);
+
+       /* Flip bat entries */
+       ploop_for_each_md_page(ploop, md, node) {
+               ploop_init_be_iter(ploop, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               for (; i <= end; i++) {
+                       if (bat_entries[i] == BAT_ENTRY_NONE)
+                               continue;
+                       if (md->bat_levels[i] == level) {
+                               md->bat_levels[i] = top_level(ploop);
+                               clear_bit(bat_entries[i], holes_bitmap);
+                       } else if (md->bat_levels[i] == top_level(ploop)) {
+                               md->bat_levels[i] = level;
+                       }
+               }
+               kunmap_atomic(bat_entries);
+       }
+
+       /* FIXME */
+       swap(ploop->deltas[level], ploop->deltas[level+1]);
+       write_unlock_irq(&ploop->bat_rwlock);
+       return 0;
+}
+
+static int ploop_set_falloc_new_clu(struct ploop *ploop, u64 val)
+{
+       if (val > 1)
+               return -EINVAL;
+       ploop->falloc_new_clu = !!val;
+       return 0;
+}
+
+static int process_tracking_start(struct ploop *ploop, void *tracking_bitmap,
+                                 u32 tb_nr)
+{
+       u32 i, nr_pages, end, *bat_entries, dst_clu, nr;
+       struct rb_node *node;
+       struct md_page *md;
+       int ret = 0;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       ploop->tracking_bitmap = tracking_bitmap;
+       ploop->tb_nr = tb_nr;
+
+       for_each_clear_bit(i, ploop->holes_bitmap, ploop->hb_nr)
+               set_bit(i, tracking_bitmap);
+       nr_pages = bat_clu_to_page_nr(ploop->nr_bat_entries - 1) + 1;
+       nr = 0;
+
+       ploop_for_each_md_page(ploop, md, node) {
+               ploop_init_be_iter(ploop, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               for (; i <= end; i++) {
+                       dst_clu = bat_entries[i];
+                       if (dst_clu == BAT_ENTRY_NONE ||
+                           md->bat_levels[i] != top_level(ploop))
+                               continue;
+                       if (WARN_ON(dst_clu >= tb_nr)) {
+                               ret = -EIO;
+                               break;
+                       }
+                       set_bit(dst_clu, tracking_bitmap);
+               }
+               kunmap_atomic(bat_entries);
+               if (ret)
+                       break;
+               nr++;
+       }
+       write_unlock_irq(&ploop->bat_rwlock);
+
+       BUG_ON(ret == 0 && nr != nr_pages);
+       return ret;
+}
+
+static int tracking_get_next(struct ploop *ploop, char *result,
+                            unsigned int maxlen)
+{
+       unsigned int i, sz = 0, tb_nr = ploop->tb_nr, prev = ploop->tb_cursor;
+       void *tracking_bitmap = ploop->tracking_bitmap;
+       int ret = -EAGAIN;
+
+       if (WARN_ON_ONCE(prev > tb_nr - 1))
+               prev = 0;
+
+       write_lock_irq(&ploop->bat_rwlock);
+       i = find_next_bit(tracking_bitmap, tb_nr, prev + 1);
+       if (i < tb_nr)
+               goto found;
+       i = find_first_bit(tracking_bitmap, prev + 1);
+       if (i >= prev + 1)
+               goto unlock;
+found:
+       ret = (DMEMIT("%u\n", i)) ? 1 : 0;
+       if (ret)
+               clear_bit(i, tracking_bitmap);
+unlock:
+       write_unlock_irq(&ploop->bat_rwlock);
+       if (ret > 0)
+               ploop->tb_cursor = i;
+       return ret;
+}
+
+static u32 max_dst_clu_in_top_delta(struct ploop *ploop)
+{
+       u32 i, nr_pages, nr = 0, end, *bat_entries, dst_clu = 0;
+       struct rb_node *node;
+       struct md_page *md;
+
+       nr_pages = bat_clu_to_page_nr(ploop->nr_bat_entries - 1) + 1;
+
+       read_lock_irq(&ploop->bat_rwlock);
+       ploop_for_each_md_page(ploop, md, node) {
+               ploop_init_be_iter(ploop, md->id, &i, &end);
+               bat_entries = kmap_atomic(md->page);
+               for (; i <= end; i++) {
+                       if (dst_clu < bat_entries[i] &&
+                           md->bat_levels[i] == top_level(ploop))
+                               dst_clu = bat_entries[i];
+               }
+               kunmap_atomic(bat_entries);
+               nr++;
+       }
+       read_unlock_irq(&ploop->bat_rwlock);
+
+       BUG_ON(nr != nr_pages);
+       return dst_clu;
+}
+
+static int ploop_tracking_cmd(struct ploop *ploop, const char *suffix,
+                             char *result, unsigned int maxlen)
+{
+       void *tracking_bitmap = NULL;
+       unsigned int tb_nr, size;
+       int ret = 0;
+
+       if (ploop_is_ro(ploop))
+               return -EROFS;
+
+       if (!strcmp(suffix, "get_next")) {
+               if (!ploop->tracking_bitmap)
+                       return -ENOENT;
+               return tracking_get_next(ploop, result, maxlen);
+       }
+
+       if (!strcmp(suffix, "start")) {
+               if (ploop->tracking_bitmap)
+                       return -EEXIST;
+               if (ploop->maintaince)
+                       return -EBUSY;
+               /* max_dst_clu_in_top_delta() may be above hb_nr */
+               tb_nr = max_dst_clu_in_top_delta(ploop) + 1;
+               if (tb_nr < ploop->hb_nr)
+                       tb_nr = ploop->hb_nr;
+               /*
+                * After max_dst_clu_in_top_delta() unlocks the lock,
+                * new entries above tb_nr can't occur, since we always
+                * alloc clusters from holes_bitmap (and they nr < hb_nr).
+                */
+               size = DIV_ROUND_UP(tb_nr, 8 * sizeof(unsigned long));
+               size *= sizeof(unsigned long);
+               tracking_bitmap = kvzalloc(size, GFP_KERNEL);
+               if (!tracking_bitmap)
+                       return -ENOMEM;
+               ploop->tb_cursor = tb_nr - 1;
+
+               ret = ploop_suspend_submitting_pios(ploop);
+               if (ret)
+                       return ret;
+
+               ploop->maintaince = true;
+               ret = process_tracking_start(ploop, tracking_bitmap, tb_nr);
+
+               ploop_resume_submitting_pios(ploop);
+
+               if (ret)
+                       goto stop;
+       } else if (!strcmp(suffix, "stop")) {
+               if (!ploop->tracking_bitmap)
+                       return -ENOENT;
+stop:
+               write_lock_irq(&ploop->bat_rwlock);
+               kvfree(ploop->tracking_bitmap);
+               ploop->tracking_bitmap = NULL;
+               write_unlock_irq(&ploop->bat_rwlock);
+               ploop->maintaince = false;
+       } else {
+               return -EINVAL;
+       }
+
+       return ret;
+}
+
+static int ploop_set_noresume(struct ploop *ploop, char *mode)
+{
+       bool noresume;
+
+       if (!strcmp(mode, "1"))
+               noresume = true;
+       else if (!strcmp(mode, "0"))
+               noresume = false;
+       else
+               return -EINVAL;
+
+       if (noresume == ploop->noresume)
+               return -EBUSY;
+
+       ploop->noresume = noresume;
+       return 0;
+}
+
+static int ploop_check_delta_before_flip(struct ploop *ploop, struct file 
*file)
+{
+       int ret = 0;
+#ifdef PLOOP_DEBUG
+       u32 i, end, *d_bat_entries, clu, size_in_clus;
+       struct rb_root md_root = RB_ROOT;
+       struct md_page *md, *d_md;
+       struct rb_node *node;
+       bool stop = false;
+
+       ret = ploop_read_delta_metadata(ploop, file, &md_root,
+                                       &size_in_clus);
+       if (ret) {
+               pr_err("Error reading metadata\n");
+               goto out;
+       }
+
+       /* Points to hdr since md_page[0] also contains hdr. */
+       d_md = md_first_entry(&md_root);
+
+       write_lock_irq(&ploop->bat_rwlock);
+       ploop_for_each_md_page(ploop, md, node) {
+               init_be_iter(size_in_clus, md->id, &i, &end);
+               d_bat_entries = kmap_atomic(d_md->page);
+               for (; i <= end; i++) {
+                       if (md_page_cluster_is_in_top_delta(ploop, md, i) &&
+                           d_bat_entries[i] != BAT_ENTRY_NONE) {
+                               ret = -EEXIST;
+                               stop = true;
+                               goto unmap;
+                       }
+               }
+
+               clu = page_clu_idx_to_bat_clu(md->id, i);
+               if (clu == size_in_clus - 1) {
+                       stop = true;
+                       goto unmap;
+               }
+unmap:
+               kunmap_atomic(d_bat_entries);
+               if (stop)
+                       break;
+               d_md = md_next_entry(d_md);
+       }
+
+       write_unlock_irq(&ploop->bat_rwlock);
+       free_md_pages_tree(&md_root);
+out:
+#endif
+       return ret;
+}
+
+static int ploop_flip_upper_deltas(struct ploop *ploop)
+{
+       struct file *file;
+       int ret;
+
+       if (!ploop->suspended || !ploop->noresume || ploop->maintaince)
+               return -EBUSY;
+       if (ploop_is_ro(ploop))
+               return -EROFS;
+       if (ploop->nr_deltas < 2)
+               return -ENOENT;
+       if (ploop->deltas[ploop->nr_deltas - 2].is_raw)
+               return -EBADSLT;
+       file = ploop->deltas[ploop->nr_deltas - 2].file;
+        if (!(file->f_mode & FMODE_WRITE))
+               return -EACCES;
+
+       ret = ploop_check_delta_before_flip(ploop, file);
+       if (ret)
+               return ret;
+
+       return process_flip_upper_deltas(ploop);
+}
+
+static int ploop_get_event(struct ploop *ploop, char *result, unsigned int 
maxlen)
+{
+       unsigned int sz = 0;
+       int ret = 0;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       if (ploop->event_enospc) {
+               ret = (DMEMIT("event_ENOSPC\n")) ? 1 : 0;
+               if (ret)
+                       ploop->event_enospc = false;
+       }
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       return ret;
+}
+
+static bool msg_wants_down_read(const char *cmd)
+{
+       /* TODO: kill get_delta_name */
+       if (!strcmp(cmd, "get_img_name"))
+               return true;
+
+       return false;
+}
+
+int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
+                 char *result, unsigned int maxlen)
+{
+       struct ploop *ploop = ti->private;
+       bool read, forward = true;
+       int ret = -EPERM;
+       u64 val;
+
+       if (!capable(CAP_SYS_ADMIN))
+               goto out;
+
+       ret = -EINVAL;
+       if (argc < 1)
+               goto out;
+
+       if (!strcmp(argv[0], "get_event")) {
+               if (argc == 1)
+                       ret = ploop_get_event(ploop, result, maxlen);
+               goto out;
+       }
+
+       read = msg_wants_down_read(argv[0]);
+       if (read)
+               down_read(&ploop->ctl_rwsem);
+       else
+               down_write(&ploop->ctl_rwsem);
+
+       if (!strcmp(argv[0], "resize")) {
+               if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
+                       goto unlock;
+               ret = ploop_resize(ploop, val);
+       } else if (!strcmp(argv[0], "merge")) {
+               if (argc == 1)
+                       ret = ploop_merge_latest_snapshot(ploop);
+       } else if (!strncmp(argv[0], "notify_merged_", 14)) {
+               if (!strcmp(&argv[0][14], "backward"))
+                       forward = false;
+               else if (strcmp(&argv[0][14], "forward"))
+                       goto unlock;
+               if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
+                       goto unlock;
+               ret = ploop_notify_merged(ploop, val, forward);
+       } else if (!strcmp(argv[0], "get_img_name")) {
+               if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
+                       goto unlock;
+               ret = ploop_get_delta_name_cmd(ploop, (u8)val, result, maxlen);
+       } else if (!strcmp(argv[0], "update_delta_index")) {
+               if (argc != 3 || kstrtou64(argv[1], 10, &val) < 0)
+                       goto unlock;
+               ret = ploop_update_delta_index(ploop, val, argv[2]);
+       } else if (!strncmp(argv[0], "set_falloc_new_clu", 20)) {
+               if (argc != 2 || kstrtou64(argv[1], 10, &val) < 0)
+                       goto unlock;
+               ret = ploop_set_falloc_new_clu(ploop, val);
+       } else if (!strncmp(argv[0], "tracking_", 9)) {
+               if (argc != 1)
+                       goto unlock;
+               ret = ploop_tracking_cmd(ploop, argv[0] + 9, result, maxlen);
+       } else if (!strcmp(argv[0], "set_noresume")) {
+               if (argc != 2)
+                       goto unlock;
+               ret = ploop_set_noresume(ploop, argv[1]);
+       } else if (!strcmp(argv[0], "flip_upper_deltas")) {
+               if (argc != 1)
+                       goto unlock;
+               ret = ploop_flip_upper_deltas(ploop);
+       } else {
+               ret = -ENOTSUPP;
+       }
+
+unlock:
+       if (read)
+               up_read(&ploop->ctl_rwsem);
+       else
+               up_write(&ploop->ctl_rwsem);
+out:
+       return ret;
+}
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
new file mode 100644
index 000000000000..16c70ec6901c
--- /dev/null
+++ b/drivers/md/dm-ploop-map.c
@@ -0,0 +1,1963 @@
+/*
+ *  drivers/md/dm-ploop-map.c
+ *
+ *  Copyright (c) 2020-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include <linux/buffer_head.h>
+#include <linux/dm-io.h>
+#include <linux/dm-kcopyd.h>
+#include <linux/sched/mm.h>
+#include <linux/cgroup.h>
+#include <linux/blk-cgroup.h>
+#include <linux/init.h>
+#include <linux/vmalloc.h>
+#include <linux/uio.h>
+#include <linux/blk-mq.h>
+#include <uapi/linux/falloc.h>
+#include "dm-ploop.h"
+#include "dm-rq.h"
+
+#define PREALLOC_SIZE (128ULL * 1024 * 1024)
+
+static void handle_cleanup(struct ploop *ploop, struct pio *pio);
+static void prq_endio(struct pio *pio, void *prq_ptr, blk_status_t bi_status);
+
+#define DM_MSG_PREFIX "ploop"
+
+static unsigned int pio_nr_segs(struct pio *pio)
+{
+       struct bvec_iter bi = {
+               .bi_size = pio->bi_iter.bi_size,
+               .bi_bvec_done = pio->bi_iter.bi_bvec_done,
+               .bi_idx = pio->bi_iter.bi_idx,
+       };
+        unsigned int nr_segs = 0;
+       struct bio_vec bv;
+
+       for_each_bvec(bv, pio->bi_io_vec, bi, bi)
+                nr_segs++;
+
+        return nr_segs;
+}
+
+void ploop_index_wb_init(struct ploop_index_wb *piwb, struct ploop *ploop)
+{
+       piwb->ploop = ploop;
+       piwb->comp = NULL;
+       piwb->comp_bi_status = NULL;
+       spin_lock_init(&piwb->lock);
+       piwb->md = NULL;
+       piwb->bat_page = NULL;
+       piwb->bi_status = 0;
+       INIT_LIST_HEAD(&piwb->ready_data_pios);
+       INIT_LIST_HEAD(&piwb->cow_list);
+       /* For ploop_bat_write_complete() */
+       atomic_set(&piwb->count, 1);
+       piwb->completed = false;
+       piwb->page_id = PAGE_NR_NONE;
+       piwb->type = PIWB_TYPE_ALLOC;
+}
+
+void init_pio(struct ploop *ploop, unsigned int bi_op, struct pio *pio)
+{
+       pio->ploop = ploop;
+       pio->css = NULL;
+       pio->bi_op = bi_op;
+       pio->wants_discard_index_cleanup = false;
+       pio->is_data_alloc = false;
+       pio->is_fake_merge = false;
+       pio->free_on_endio = false;
+       pio->ref_index = PLOOP_REF_INDEX_INVALID;
+       pio->queue_list_id = PLOOP_LIST_DEFERRED;
+       pio->bi_status = BLK_STS_OK;
+       atomic_set(&pio->remaining, 1);
+       pio->piwb = NULL;
+       INIT_LIST_HEAD(&pio->list);
+       INIT_HLIST_NODE(&pio->hlist_node);
+       INIT_LIST_HEAD(&pio->endio_list);
+       /* FIXME: assign real clu? */
+       pio->clu = UINT_MAX;
+       pio->level = BAT_LEVEL_INVALID;
+}
+
+/* Get clu related to pio sectors */
+static int ploop_rq_valid(struct ploop *ploop, struct request *rq)
+{
+       sector_t sector = blk_rq_pos(rq);
+       loff_t end_byte;
+       u32 end_clu;
+
+       end_byte = to_bytes(sector) + blk_rq_bytes(rq) - 1;
+       end_clu = POS_TO_CLU(ploop, end_byte);
+
+       if (unlikely(end_clu >= ploop->nr_bat_entries)) {
+               /*
+                * This mustn't happen, since we set max_io_len
+                * via dm_set_target_max_io_len().
+                */
+               WARN_ONCE(1, "sec=%llu, size=%u, end_clu=%u, nr=%u\n",
+                         sector, blk_rq_bytes(rq),
+                         end_clu, ploop->nr_bat_entries);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static void init_prq(struct ploop_rq *prq, struct request *rq)
+{
+       prq->rq = rq;
+       prq->bvec = NULL;
+       prq->css = NULL;
+#ifdef CONFIG_BLK_CGROUP
+       if (rq->bio && rq->bio->bi_blkg) {
+               prq->css = &bio_blkcg(rq->bio)->css;
+               css_get(prq->css); /* css_put is in prq_endio */
+       }
+#endif
+}
+
+static void init_prq_and_embedded_pio(struct ploop *ploop, struct request *rq,
+                                     struct ploop_rq *prq, struct pio *pio)
+{
+       init_prq(prq, rq);
+       init_pio(ploop, req_op(rq), pio);
+       pio->css = prq->css;
+
+       pio->endio_cb = prq_endio;
+       pio->endio_cb_data = prq;
+}
+
+void ploop_enospc_timer(struct timer_list *timer)
+{
+       struct ploop *ploop = from_timer(ploop, timer, enospc_timer);
+       unsigned long flags;
+       LIST_HEAD(list);
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       list_splice_init(&ploop->enospc_pios, &list);
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       submit_embedded_pios(ploop, &list);
+}
+
+void ploop_event_work(struct work_struct *ws)
+{
+       struct ploop *ploop = container_of(ws, struct ploop, event_work);
+
+       dm_table_event(ploop->ti->table);
+}
+
+static bool ploop_try_delay_enospc(struct ploop_rq *prq, struct pio *pio)
+{
+       struct ploop *ploop = pio->ploop;
+       bool delayed = true;
+       unsigned long flags;
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       if (unlikely(ploop->wants_suspend)) {
+               delayed = false;
+               goto unlock;
+       }
+
+       init_prq_and_embedded_pio(ploop, prq->rq, prq, pio);
+
+       ploop->event_enospc = true;
+       list_add_tail(&pio->list, &ploop->enospc_pios);
+unlock:
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       if (delayed)
+               mod_timer(&ploop->enospc_timer, jiffies + PLOOP_ENOSPC_TIMEOUT);
+       schedule_work(&ploop->event_work);
+
+       return delayed;
+}
+
+static void prq_endio(struct pio *pio, void *prq_ptr, blk_status_t bi_status)
+{
+        struct ploop_rq *prq = prq_ptr;
+        struct request *rq = prq->rq;
+
+       if (prq->bvec)
+               kfree(prq->bvec);
+       if (prq->css)
+               css_put(prq->css);
+       /*
+        * Here is exit point for rq, and here we handle ENOSPC.
+        * Embedded pios will be reinitialized like they've just
+        * came from upper dm level, and later resubmitted after
+        * timeout. Note, that we do not handle merge here: merge
+        * callers receive -ENOSPC synchronous without intermediaries.
+        */
+       if (unlikely(bi_status == BLK_STS_NOSPC)) {
+               WARN_ON_ONCE(!op_is_write(pio->bi_op));
+               if (ploop_try_delay_enospc(prq, pio))
+                       return;
+       }
+
+       mempool_free(prq, pio->ploop->prq_pool);
+       dm_complete_request(rq, bi_status);
+}
+
+static void do_pio_endio(struct pio *pio)
+{
+       ploop_endio_t endio_cb = pio->endio_cb;
+       void *endio_cb_data = pio->endio_cb_data;
+       bool free_on_endio = pio->free_on_endio;
+
+        if (!atomic_dec_and_test(&pio->remaining))
+                return;
+
+       endio_cb(pio, endio_cb_data, pio->bi_status);
+
+       if (free_on_endio)
+               free_pio(pio->ploop, pio);
+}
+
+void pio_endio(struct pio *pio)
+{
+       struct ploop *ploop = pio->ploop;
+
+       if (pio->ref_index != PLOOP_REF_INDEX_INVALID)
+               track_pio(ploop, pio);
+
+       handle_cleanup(ploop, pio);
+
+       do_pio_endio(pio);
+}
+
+static void pio_chain_endio(struct pio *pio, void *parent_ptr,
+                           blk_status_t bi_status)
+{
+        struct pio *parent = parent_ptr;
+
+        if (unlikely(bi_status))
+                parent->bi_status = bi_status;
+
+        do_pio_endio(parent);
+}
+
+static void pio_chain(struct pio *pio, struct pio *parent)
+{
+       BUG_ON(pio->endio_cb_data || pio->endio_cb);
+
+       pio->endio_cb_data = parent;
+       pio->endio_cb = pio_chain_endio;
+       atomic_inc(&parent->remaining);
+}
+
+/* Clone of bio_advance_iter() */
+static void pio_advance(struct pio *pio, unsigned int bytes)
+{
+       struct bvec_iter *iter = &pio->bi_iter;
+
+       iter->bi_sector += bytes >> 9;
+
+       if (op_is_discard(pio->bi_op))
+               iter->bi_size -= bytes;
+       else
+               bvec_iter_advance(pio->bi_io_vec, iter, bytes);
+}
+
+static struct pio * split_and_chain_pio(struct ploop *ploop,
+               struct pio *pio, u32 len)
+{
+       struct pio *split;
+
+       split = alloc_pio(ploop, GFP_NOIO);
+       if (!split)
+               return NULL;
+
+       init_pio(ploop, pio->bi_op, split);
+       split->css = pio->css;
+       split->queue_list_id = pio->queue_list_id;
+       split->free_on_endio = true;
+       split->bi_io_vec = pio->bi_io_vec;
+       split->bi_iter = pio->bi_iter;
+       split->bi_iter.bi_size = len;
+       split->endio_cb = NULL;
+       split->endio_cb_data = NULL;
+       pio_chain(split, pio);
+       if (len)
+               pio_advance(pio, len);
+       return split;
+}
+
+static int split_pio_to_list(struct ploop *ploop, struct pio *pio,
+                            struct list_head *ret_list)
+{
+       u32 clu_size = CLU_SIZE(ploop);
+       struct pio *split;
+       LIST_HEAD(list);
+
+       while (1) {
+               loff_t start = to_bytes(pio->bi_iter.bi_sector);
+               loff_t end = start + pio->bi_iter.bi_size;
+               unsigned int len;
+
+               WARN_ON_ONCE(start == end);
+
+               if (start / clu_size == (end - 1) / clu_size)
+                       break;
+               end = round_up(start + 1, clu_size);
+               len = end - start;
+
+               split = split_and_chain_pio(ploop, pio, len);
+               if (!split)
+                       goto err;
+
+               list_add_tail(&split->list, &list);
+       }
+
+       list_splice_tail(&list, ret_list);
+       list_add_tail(&pio->list, ret_list);
+       return 0;
+err:
+       while ((pio = pio_list_pop(&list)) != NULL) {
+               pio->bi_status = BLK_STS_RESOURCE;
+               pio_endio(pio);
+       }
+       return -ENOMEM;
+}
+
+static void dispatch_pio(struct ploop *ploop, struct pio *pio,
+                        bool *is_data, bool *is_flush)
+{
+       struct list_head *list = &ploop->pios[pio->queue_list_id];
+
+       lockdep_assert_held(&ploop->deferred_lock);
+       WARN_ON_ONCE(pio->queue_list_id >= PLOOP_LIST_COUNT);
+
+       if (pio->queue_list_id == PLOOP_LIST_FLUSH)
+               *is_flush = true;
+       else
+               *is_data = true;
+
+       list_add_tail(&pio->list, list);
+}
+
+void dispatch_pios(struct ploop *ploop, struct pio *pio, struct list_head 
*pio_list)
+{
+       bool is_data = false, is_flush = false;
+       unsigned long flags;
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       if (pio)
+               dispatch_pio(ploop, pio, &is_data, &is_flush);
+       if (pio_list) {
+               while ((pio = pio_list_pop(pio_list)) != NULL)
+                       dispatch_pio(ploop, pio, &is_data, &is_flush);
+       }
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       if (is_data)
+               queue_work(ploop->wq, &ploop->worker);
+       if (is_flush)
+               queue_work(ploop->wq, &ploop->fsync_worker);
+}
+
+static bool delay_if_md_busy(struct ploop *ploop, struct md_page *md,
+                            enum piwb_type type, struct pio *pio)
+{
+       struct ploop_index_wb *piwb;
+       unsigned long flags;
+       bool busy = false;
+
+       WARN_ON_ONCE(!list_empty(&pio->list));
+
+       write_lock_irqsave(&ploop->bat_rwlock, flags);
+       piwb = md->piwb;
+       if (piwb && (piwb->type != type || (md->status & MD_WRITEBACK))) {
+               list_add_tail(&pio->list, &md->wait_list);
+               busy = true;
+       }
+       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+
+       return busy;
+}
+
+void track_dst_cluster(struct ploop *ploop, u32 dst_clu)
+{
+       unsigned long flags;
+
+       if (!ploop->tracking_bitmap)
+               return;
+
+       read_lock_irqsave(&ploop->bat_rwlock, flags);
+       if (ploop->tracking_bitmap && !WARN_ON(dst_clu >= ploop->tb_nr))
+               set_bit(dst_clu, ploop->tracking_bitmap);
+       read_unlock_irqrestore(&ploop->bat_rwlock, flags);
+}
+
+/*
+ * Userspace calls dm_suspend() to get changed blocks finally.
+ * dm_suspend() waits for dm's inflight bios, so this function
+ * must be called after @bio is written and before @bio is ended.
+ * The only possible exception is writes driven by "message" ioctl.
+ * Thus, userspace mustn't do maintaince operations in parallel
+ * with tracking.
+ */
+void __track_pio(struct ploop *ploop, struct pio *pio)
+{
+       u32 dst_clu = SEC_TO_CLU(ploop, pio->bi_iter.bi_sector);
+
+       if (!op_is_write(pio->bi_op) || !bvec_iter_sectors((pio)->bi_iter))
+               return;
+
+       track_dst_cluster(ploop, dst_clu);
+}
+
+static void queue_discard_index_wb(struct ploop *ploop, struct pio *pio)
+{
+       pio->queue_list_id = PLOOP_LIST_DISCARD;
+       dispatch_pios(ploop, pio, NULL);
+}
+
+/* Zero @count bytes of @qio->bi_io_vec since @from byte */
+static void zero_fill_pio(struct pio *pio)
+{
+       struct bvec_iter bi = {
+               .bi_size = pio->bi_iter.bi_size,
+               .bi_bvec_done = pio->bi_iter.bi_bvec_done,
+               .bi_idx = pio->bi_iter.bi_idx,
+       };
+       struct bio_vec bv;
+       void *data;
+
+       for_each_bvec(bv, pio->bi_io_vec, bi, bi) {
+               if (!bv.bv_len)
+                       continue;
+               data = kmap(bv.bv_page);
+               memset(data + bv.bv_offset, 0, bv.bv_len);
+               kunmap(bv.bv_page);
+       }
+}
+
+struct pio *find_pio(struct hlist_head head[], u32 clu)
+{
+       struct hlist_head *slot = ploop_htable_slot(head, clu);
+       struct pio *pio;
+
+       BUG_ON(!slot);
+
+       hlist_for_each_entry(pio, slot, hlist_node) {
+               if (pio->clu == clu)
+                       return pio;
+       }
+
+       return NULL;
+}
+
+static struct pio *find_inflight_bio(struct ploop *ploop, u32 clu)
+{
+       lockdep_assert_held(&ploop->inflight_lock);
+       return find_pio(ploop->inflight_pios, clu);
+}
+
+struct pio *find_lk_of_cluster(struct ploop *ploop, u32 clu)
+{
+       lockdep_assert_held(&ploop->deferred_lock);
+       return find_pio(ploop->exclusive_pios, clu);
+}
+
+static void add_endio_pio(struct pio *head, struct pio *pio)
+{
+       list_add_tail(&pio->list, &head->endio_list);
+}
+
+static void inc_nr_inflight(struct ploop *ploop, struct pio *pio)
+{
+       unsigned char ref_index = ploop->inflight_bios_ref_index;
+
+       if (!WARN_ON_ONCE(pio->ref_index != PLOOP_REF_INDEX_INVALID)) {
+               percpu_ref_get(&ploop->inflight_bios_ref[ref_index]);
+               pio->ref_index = ref_index;
+       }
+}
+
+/*
+ * Note, that do_ploop_work() waits final ref dec_nr_inflight()
+ * (e.g., on grow), so the code decrementing the counter can't
+ * depend on the work or some actions it makes.
+ *
+ * The only intended usecase is that the counter is decremented
+ * from endio of bios submitted to underlined device (loop) or
+ * from ki_complete of requests submitted to delta files
+ * (while increment occurs just right before the submitting).
+ */
+static void dec_nr_inflight(struct ploop *ploop, struct pio *pio)
+{
+       if (pio->ref_index != PLOOP_REF_INDEX_INVALID) {
+               percpu_ref_put(&ploop->inflight_bios_ref[pio->ref_index]);
+               pio->ref_index = PLOOP_REF_INDEX_INVALID;
+       }
+}
+
+static void link_pio(struct hlist_head head[], struct pio *pio,
+                    u32 clu, bool exclusive)
+{
+       struct hlist_head *slot = ploop_htable_slot(head, clu);
+
+       if (exclusive)
+               WARN_ON_ONCE(find_pio(head, clu) != NULL);
+
+       BUG_ON(!hlist_unhashed(&pio->hlist_node));
+       hlist_add_head(&pio->hlist_node, slot);
+       pio->clu = clu;
+}
+
+/*
+ * Removes @pio of completed bio either from inflight_pios_rbtree
+ * or from exclusive_bios_rbtree. BIOs from endio_list are requeued
+ * to deferred_list.
+ */
+static void unlink_pio(struct ploop *ploop, struct pio *pio,
+                      struct list_head *pio_list)
+{
+       BUG_ON(hlist_unhashed(&pio->hlist_node));
+
+       hlist_del_init(&pio->hlist_node);
+       list_splice_tail_init(&pio->endio_list, pio_list);
+}
+
+static void add_cluster_lk(struct ploop *ploop, struct pio *pio, u32 clu)
+{
+       unsigned long flags;
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       link_pio(ploop->exclusive_pios, pio, clu, true);
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+}
+static void del_cluster_lk(struct ploop *ploop, struct pio *pio)
+{
+       LIST_HEAD(pio_list);
+       unsigned long flags;
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       unlink_pio(ploop, pio, &pio_list);
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       if (!list_empty(&pio_list))
+               dispatch_pios(ploop, NULL, &pio_list);
+}
+
+static void link_submitting_pio(struct ploop *ploop, struct pio *pio, u32 clu)
+{
+       unsigned long flags;
+
+       spin_lock_irqsave(&ploop->inflight_lock, flags);
+       link_pio(ploop->inflight_pios, pio, clu, false);
+       spin_unlock_irqrestore(&ploop->inflight_lock, flags);
+}
+static void unlink_completed_pio(struct ploop *ploop, struct pio *pio)
+{
+       LIST_HEAD(pio_list);
+       unsigned long flags;
+
+       if (hlist_unhashed(&pio->hlist_node))
+               return;
+
+       spin_lock_irqsave(&ploop->inflight_lock, flags);
+       unlink_pio(ploop, pio, &pio_list);
+       spin_unlock_irqrestore(&ploop->inflight_lock, flags);
+
+       if (!list_empty(&pio_list))
+               dispatch_pios(ploop, NULL, &pio_list);
+}
+
+static bool ploop_md_make_dirty(struct ploop *ploop, struct md_page *md)
+{
+       unsigned long flags;
+       bool new = false;
+
+       write_lock_irqsave(&ploop->bat_rwlock, flags);
+       WARN_ON_ONCE((md->status & MD_WRITEBACK));
+        if (!(md->status & MD_DIRTY)) {
+                md->status |= MD_DIRTY;
+                list_add_tail(&md->wb_link, &ploop->wb_batch_list);
+                new = true;
+       }
+       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+
+       return new;
+}
+
+static bool pio_endio_if_all_zeros(struct pio *pio)
+{
+       struct bvec_iter bi = {
+               .bi_size = pio->bi_iter.bi_size,
+               .bi_bvec_done = pio->bi_iter.bi_bvec_done,
+               .bi_idx = pio->bi_iter.bi_idx,
+       };
+       struct bio_vec bv;
+       void *data, *ret;
+
+       for_each_bvec(bv, pio->bi_io_vec, bi, bi) {
+               if (!bv.bv_len)
+                       continue;
+               data = kmap(bv.bv_page);
+               ret = memchr_inv(data + bv.bv_offset, 0, bv.bv_len);
+               kunmap(bv.bv_page);
+               if (ret)
+                       return false;
+       }
+
+       pio_endio(pio);
+       return true;
+}
+
+static bool pio_endio_if_merge_fake_pio(struct pio *pio)
+{
+       if (likely(!fake_merge_pio(pio)))
+               return false;
+       pio_endio(pio);
+       return true;
+}
+
+static int punch_hole(struct file *file, loff_t pos, loff_t len)
+{
+       return vfs_fallocate(file, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
+                            pos, len);
+}
+
+static int zero_range(struct file *file, loff_t pos, loff_t len)
+{
+       return vfs_fallocate(file, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE,
+                            pos, len);
+}
+
+static void handle_discard_pio(struct ploop *ploop, struct pio *pio,
+                              u32 clu, u32 dst_clu)
+{
+       struct pio *inflight_h;
+       unsigned long flags;
+       loff_t pos;
+       int ret;
+
+       if (!whole_cluster(ploop, pio)) {
+               /*
+                * Despite discard_granularity is given, block level
+                * may submit shorter reqs. E.g., these are boundary
+                * bios around trimed continuous hunk. For discard
+                * it's OK to just ignore such reqs. Keep in mind
+                * this implementing REQ_OP_WRITE_ZEROES etc.
+                */
+               pio_endio(pio);
+               return;
+       }
+
+       if (!cluster_is_in_top_delta(ploop, clu)) {
+               pio_endio(pio);
+               return;
+       }
+
+       /* We can't end with EOPNOTSUPP, since blk-mq prints error */
+       if (ploop->nr_deltas != 1)
+               goto punch_hole;
+
+       spin_lock_irqsave(&ploop->inflight_lock, flags);
+       inflight_h = find_inflight_bio(ploop, clu);
+       if (inflight_h)
+               add_endio_pio(inflight_h, pio);
+       spin_unlock_irqrestore(&ploop->inflight_lock, flags);
+
+       if (inflight_h) {
+               /* @pio will be requeued on inflight_h's pio end */
+               pr_err_once("ploop: delayed discard: device is used as raw?\n");
+               return;
+       }
+
+       add_cluster_lk(ploop, pio, clu);
+       pio->wants_discard_index_cleanup = true;
+
+punch_hole:
+       remap_to_cluster(ploop, pio, dst_clu);
+       pos = to_bytes(pio->bi_iter.bi_sector);
+       ret = punch_hole(top_delta(ploop)->file, pos, pio->bi_iter.bi_size);
+       if (ret || ploop->nr_deltas != 1) {
+               if (ret)
+                       pio->bi_status = errno_to_blk_status(ret);
+               pio_endio(pio);
+               return;
+       }
+
+       queue_discard_index_wb(ploop, pio);
+}
+
+static void ploop_discard_index_pio_end(struct ploop *ploop, struct pio *pio)
+{
+       del_cluster_lk(ploop, pio);
+}
+
+static void queue_or_fail(struct ploop *ploop, int err, void *data)
+{
+       struct pio *pio = data;
+
+       /* FIXME: do we use BLK_STS_AGAIN? */
+       if (err && err != BLK_STS_AGAIN) {
+               pio->bi_status = errno_to_blk_status(err);
+               pio_endio(pio);
+       } else {
+               dispatch_pios(ploop, pio, NULL);
+       }
+}
+
+static void complete_cow(struct ploop_cow *cow, blk_status_t bi_status)
+{
+       struct pio *aux_pio = cow->aux_pio;
+       struct ploop *ploop = cow->ploop;
+       u32 dst_clu = cow->dst_clu;
+       unsigned long flags;
+       struct pio *cow_pio;
+
+       WARN_ON_ONCE(!list_empty(&aux_pio->list));
+       cow_pio = cow->cow_pio;
+
+       del_cluster_lk(ploop, cow_pio);
+
+       if (dst_clu != BAT_ENTRY_NONE && bi_status != BLK_STS_OK) {
+               read_lock_irqsave(&ploop->bat_rwlock, flags);
+               ploop_hole_set_bit(dst_clu, ploop);
+               read_unlock_irqrestore(&ploop->bat_rwlock, flags);
+       }
+
+       queue_or_fail(ploop, blk_status_to_errno(bi_status), cow_pio);
+
+       queue_work(ploop->wq, &ploop->worker);
+       free_pio_with_pages(ploop, cow->aux_pio);
+       kmem_cache_free(cow_cache, cow);
+}
+
+static void ploop_release_cluster(struct ploop *ploop, u32 clu)
+{
+       u32 id, *bat_entries, dst_clu;
+       struct md_page *md;
+
+       lockdep_assert_held(&ploop->bat_rwlock);
+
+       id = bat_clu_to_page_nr(clu);
+        md = md_page_find(ploop, id);
+        BUG_ON(!md);
+
+       clu = bat_clu_idx_in_page(clu); /* relative to page */
+
+       bat_entries = kmap_atomic(md->page);
+       dst_clu = bat_entries[clu];
+       bat_entries[clu] = BAT_ENTRY_NONE;
+       md->bat_levels[clu] = 0;
+       kunmap_atomic(bat_entries);
+
+       ploop_hole_set_bit(dst_clu, ploop);
+}
+
+static void piwb_discard_completed(struct ploop *ploop, bool success,
+                                  u32 clu, u32 new_dst_clu)
+{
+       if (new_dst_clu)
+               return;
+
+       if (cluster_is_in_top_delta(ploop, clu)) {
+               WARN_ON_ONCE(ploop->nr_deltas != 1);
+               if (success)
+                       ploop_release_cluster(ploop, clu);
+       }
+}
+
+/*
+ * Update local BAT copy with written indexes on success.
+ * Mark allocate clusters as holes on failure.
+ * FIXME: a failure may mean some sectors are written, so
+ * we have to reread BAT page to check that.
+ */
+static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
+                                            struct ploop_index_wb *piwb,
+                                            bool success)
+{
+       struct md_page *md = piwb->md;
+       u32 i, last, *bat_entries;
+       map_index_t *dst_clu, off;
+       unsigned long flags;
+       LIST_HEAD(list);
+
+       BUG_ON(!md);
+       bat_entries = kmap_atomic(md->page);
+
+       /* Absolute number of first index in page (negative for page#0) */
+       off = piwb->page_id * PAGE_SIZE / sizeof(map_index_t);
+       off -= PLOOP_MAP_OFFSET;
+
+       /* Last and first index in copied page */
+       last = ploop->nr_bat_entries - off;
+       if (last > PAGE_SIZE / sizeof(map_index_t))
+               last = PAGE_SIZE / sizeof(map_index_t);
+       i = 0;
+       if (!piwb->page_id)
+               i = PLOOP_MAP_OFFSET;
+
+       dst_clu = kmap_atomic(piwb->bat_page);
+       write_lock_irqsave(&ploop->bat_rwlock, flags);
+
+       for (; i < last; i++) {
+               if (piwb->type == PIWB_TYPE_DISCARD) {
+                       piwb_discard_completed(ploop, success, i + off, 
dst_clu[i]);
+                       continue;
+               }
+
+               if (!dst_clu[i])
+                       continue;
+
+               if (cluster_is_in_top_delta(ploop, i + off) && piwb->type == 
PIWB_TYPE_ALLOC) {
+                       WARN_ON(bat_entries[i] != dst_clu[i]);
+                       continue;
+               }
+
+               if (success) {
+                       bat_entries[i] = dst_clu[i];
+                       md->bat_levels[i] = top_level(ploop);
+               } else {
+                       ploop_hole_set_bit(i + off, ploop);
+               }
+       }
+
+       WARN_ON_ONCE(!(md->status & MD_WRITEBACK));
+       md->status &= ~MD_WRITEBACK;
+       md->piwb = NULL;
+       list_splice_tail_init(&md->wait_list, &list);
+       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+       kunmap_atomic(dst_clu);
+       kunmap_atomic(bat_entries);
+
+       if (!list_empty(&list))
+               dispatch_pios(ploop, NULL, &list);
+}
+
+static void free_piwb(struct ploop_index_wb *piwb)
+{
+       free_pio(piwb->ploop, piwb->pio);
+       put_page(piwb->bat_page);
+       kfree(piwb);
+}
+
+static void put_piwb(struct ploop_index_wb *piwb)
+{
+       if (atomic_dec_and_test(&piwb->count)) {
+               struct ploop *ploop = piwb->ploop;
+               /*
+                * Index wb failed. Mark clusters as unallocated again.
+                * piwb->count is zero, so all data writers compeleted.
+                */
+               if (piwb->bi_status)
+                       ploop_advance_local_after_bat_wb(ploop, piwb, false);
+
+               if (piwb->comp) {
+                       if (piwb->comp_bi_status)
+                               *piwb->comp_bi_status = piwb->bi_status;
+                       complete(piwb->comp);
+               }
+               free_piwb(piwb);
+       }
+}
+
+/* This handler is called after BAT is updated. */
+static void ploop_bat_write_complete(struct ploop_index_wb *piwb,
+                                    blk_status_t bi_status)
+{
+       struct ploop *ploop = piwb->ploop;
+       struct pio *aux_pio;
+       struct ploop_cow *cow;
+       struct pio *data_pio;
+       unsigned long flags;
+
+       if (!bi_status) {
+               /*
+                * Success: now update local BAT copy. We could do this
+                * from our delayed work, but we want to publish new
+                * mapping in the fastest way. This must be done before
+                * data bios completion, since right after we complete
+                * a bio, subsequent read wants to see written data
+                * (ploop_map() wants to see not zero bat_entries[.]).
+                */
+               ploop_advance_local_after_bat_wb(ploop, piwb, true);
+       }
+
+       spin_lock_irqsave(&piwb->lock, flags);
+       piwb->completed = true;
+       piwb->bi_status = bi_status;
+       spin_unlock_irqrestore(&piwb->lock, flags);
+
+       /*
+        * End pending data bios. Unlocked, as nobody can
+        * add a new element after piwc->completed is true.
+        */
+       while ((data_pio = pio_list_pop(&piwb->ready_data_pios)) != NULL) {
+               if (bi_status)
+                       data_pio->bi_status = bi_status;
+               pio_endio(data_pio);
+       }
+
+       while ((aux_pio = pio_list_pop(&piwb->cow_list))) {
+               cow = aux_pio->endio_cb_data;
+               complete_cow(cow, bi_status);
+       }
+
+       /*
+        * In case of update BAT is failed, dst_clusters will be
+        * set back to holes_bitmap on last put_piwb().
+        */
+       put_piwb(piwb);
+}
+
+static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md,
+                                   enum piwb_type type)
+{
+       u32 i, off, last, *bat_entries;
+       struct ploop_index_wb *piwb;
+       bool is_last_page = true;
+       u32 page_id = md->id;
+       struct page *page;
+       struct pio *pio;
+       map_index_t *to;
+
+       piwb = kmalloc(sizeof(*piwb), GFP_NOIO);
+       if (!piwb)
+               return -ENOMEM;
+       ploop_index_wb_init(piwb, ploop);
+
+       piwb->bat_page = page = alloc_page(GFP_NOIO);
+       piwb->pio = pio = alloc_pio(ploop, GFP_NOIO);
+       if (!page || !pio)
+               goto err;
+       init_pio(ploop, REQ_OP_WRITE, pio);
+
+       bat_entries = kmap_atomic(md->page);
+
+       write_lock_irq(&ploop->bat_rwlock);
+       md->piwb = piwb;
+       piwb->md = md;
+       write_unlock_irq(&ploop->bat_rwlock);
+
+       piwb->page_id = page_id;
+       to = kmap_atomic(page);
+       memcpy((void *)to, bat_entries, PAGE_SIZE);
+
+       /* Absolute number of first index in page (negative for page#0) */
+       off = page_id * PAGE_SIZE / sizeof(map_index_t);
+       off -= PLOOP_MAP_OFFSET;
+
+       /* Last and first index in copied page */
+       last = ploop->nr_bat_entries - off;
+       if (last > PAGE_SIZE / sizeof(map_index_t)) {
+               last = PAGE_SIZE / sizeof(map_index_t);
+               is_last_page = false;
+       }
+       i = 0;
+       if (!page_id)
+               i = PLOOP_MAP_OFFSET;
+
+       /* Copy BAT (BAT goes right after hdr, see .ctr) */
+       for (; i < last; i++) {
+               if (cluster_is_in_top_delta(ploop, i + off))
+                       continue;
+               to[i] = 0;
+       }
+       if (is_last_page) {
+       /* Fill tail of page with 0 */
+               for (i = last; i < PAGE_SIZE / sizeof(map_index_t); i++)
+                       to[i] = 0;
+       }
+
+       kunmap_atomic(to);
+       kunmap_atomic(bat_entries);
+
+       piwb->type = type;
+       return 0;
+err:
+       free_piwb(piwb);
+       return -ENOMEM;
+}
+
+void ploop_break_bat_update(struct ploop *ploop, struct md_page *md)
+{
+       struct ploop_index_wb *piwb;
+       unsigned long flags;
+
+       write_lock_irqsave(&ploop->bat_rwlock, flags);
+       piwb = md->piwb;
+       md->piwb = NULL;
+       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+
+       free_piwb(piwb);
+}
+
+static void ploop_bat_page_zero_cluster(struct ploop *ploop,
+                                       struct ploop_index_wb *piwb,
+                                       u32 clu)
+{
+       map_index_t *to;
+
+       /* Cluster index related to the page[page_id] start */
+       clu = bat_clu_idx_in_page(clu);
+
+       to = kmap_atomic(piwb->bat_page);
+       to[clu] = 0;
+       kunmap_atomic(to);
+}
+
+static int find_dst_clu_bit(struct ploop *ploop,
+                     u32 *ret_dst_clu)
+{
+       u32 dst_clu;
+
+       /* Find empty clu */
+       dst_clu = find_first_bit(ploop->holes_bitmap, ploop->hb_nr);
+       if (dst_clu >= ploop->hb_nr)
+               return -EIO;
+       *ret_dst_clu = dst_clu;
+       return 0;
+}
+
+static int truncate_prealloc_safe(struct ploop *ploop, struct ploop_delta 
*delta,
+                                 loff_t len, const char *func)
+{
+       struct file *file = delta->file;
+       loff_t old_len = delta->file_size;
+       loff_t new_len = len;
+       int ret;
+
+       if (new_len <= old_len)
+               return 0;
+       new_len = ALIGN(new_len, PREALLOC_SIZE);
+
+       if (!ploop->falloc_new_clu)
+               ret = vfs_truncate2(&file->f_path, new_len, file);
+       else
+               ret = vfs_fallocate(file, 0, old_len, new_len - old_len);
+       if (ret) {
+               pr_err("ploop: %s->prealloc: %d\n", func, ret);
+               return ret;
+       }
+
+       ret = vfs_fsync(file, 0);
+       if (ret) {
+               pr_err("ploop: %s->fsync(): %d\n", func, ret);
+               return ret;
+       }
+
+       delta->file_size = new_len;
+       delta->file_preallocated_area_start = len;
+       return 0;
+}
+
+static int allocate_cluster(struct ploop *ploop, u32 *dst_clu)
+{
+       struct ploop_delta *top = top_delta(ploop);
+       u32 clu_size = CLU_SIZE(ploop);
+       loff_t off, pos, end, old_size;
+       struct file *file = top->file;
+       int ret;
+
+       if (find_dst_clu_bit(ploop, dst_clu) < 0)
+               return -EIO;
+
+       pos = CLU_TO_POS(ploop, *dst_clu);
+       end = pos + clu_size;
+       old_size = top->file_size;
+
+       if (pos < top->file_preallocated_area_start) {
+               /* Clu at @pos may contain dirty data */
+               off = min_t(loff_t, old_size, end);
+               if (!ploop->falloc_new_clu)
+                       ret = punch_hole(file, pos, off - pos);
+               else
+                       ret = zero_range(file, pos, off - pos);
+               if (ret) {
+                       pr_err("ploop: punch/zero area: %d\n", ret);
+                       return ret;
+               }
+       }
+
+       if (end > old_size) {
+               ret = truncate_prealloc_safe(ploop, top, end, __func__);
+               if (ret)
+                       return ret;
+       } else if (pos < top->file_preallocated_area_start) {
+               /*
+                * Flush punch_hole()/zero_range() modifications.
+                * TODO: track recentry unused blocks and do that
+                * in background.
+                */
+               ret = vfs_fsync(file, 0);
+               if (ret)
+                       return ret;
+       }
+
+       if (end > top->file_preallocated_area_start)
+               top->file_preallocated_area_start = end;
+       /*
+        * Mark clu as used. Find & clear bit is unlocked,
+        * since currently this may be called only from deferred
+        * kwork. Note, that set_bit may be made from many places.
+        */
+       ploop_hole_clear_bit(*dst_clu, ploop);
+       return 0;
+}
+
+/*
+ * This finds a free dst_clu on origin device, and reflects this
+ * in ploop->holes_bitmap and bat_page.
+ */
+static int ploop_alloc_cluster(struct ploop *ploop, struct ploop_index_wb 
*piwb,
+                              u32 clu, u32 *dst_clu)
+{
+       struct page *page = piwb->bat_page;
+       bool already_alloced = false;
+       map_index_t *to;
+       int ret = 0;
+
+       /* Cluster index related to the page[page_id] start */
+       clu -= piwb->page_id * PAGE_SIZE / sizeof(map_index_t) - 
PLOOP_MAP_OFFSET;
+
+       to = kmap_atomic(page);
+       if (to[clu]) {
+               /* Already mapped by one of previous bios */
+               *dst_clu = to[clu];
+               already_alloced = true;
+       }
+       kunmap_atomic(to);
+
+       if (already_alloced)
+               goto out;
+
+       ret = allocate_cluster(ploop, dst_clu);
+       if (ret < 0)
+               goto out;
+
+       to = kmap_atomic(page);
+       to[clu] = *dst_clu;
+       kunmap_atomic(to);
+out:
+       return ret;
+}
+
+static bool ploop_data_pio_end(struct pio *pio)
+{
+       struct ploop_index_wb *piwb = pio->piwb;
+       unsigned long flags;
+       bool completed;
+
+       spin_lock_irqsave(&piwb->lock, flags);
+       completed = piwb->completed;
+       if (!completed)
+               list_add_tail(&pio->list, &piwb->ready_data_pios);
+       else if (!pio->bi_status)
+               pio->bi_status = piwb->bi_status;
+       spin_unlock_irqrestore(&piwb->lock, flags);
+
+       put_piwb(piwb);
+
+       return completed;
+}
+
+static void ploop_attach_end_action(struct pio *pio, struct ploop_index_wb 
*piwb)
+{
+       pio->is_data_alloc = true;
+       pio->piwb = piwb;
+
+       atomic_inc(&piwb->count);
+}
+
+static void ploop_queue_resubmit(struct pio *pio)
+{
+       struct ploop *ploop = pio->ploop;
+       unsigned long flags;
+
+       pio->queue_list_id = PLOOP_LIST_INVALID;
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       list_add_tail(&pio->list, &ploop->resubmit_pios);
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       queue_work(ploop->wq, &ploop->worker);
+}
+
+static void data_rw_complete(struct pio *pio)
+{
+       bool completed;
+
+       if (pio->ret != pio->bi_iter.bi_size) {
+               if (pio->ret >= 0) {
+                       /* Partial IO */
+                       WARN_ON_ONCE(pio->ret == 0);
+                       pio_advance(pio, pio->ret);
+                       ploop_queue_resubmit(pio);
+                       return;
+               }
+               pio->bi_status = errno_to_blk_status(pio->ret);
+       }
+
+       if (pio->is_data_alloc) {
+               completed = ploop_data_pio_end(pio);
+               if (!completed)
+                       return;
+       }
+
+       pio_endio(pio);
+}
+
+/*
+ * XXX: Keep in mind, data_rw_complete may queue resubmit after partial IO.
+ * Don't use this function from fsync kwork in case of the caller blocks
+ * to wait for completion, since kwork is who resubmits after partial IO.
+ */
+static void submit_rw_mapped(struct ploop *ploop, struct pio *pio)
+{
+       struct cgroup_subsys_state *css = pio->css;
+       unsigned int rw, nr_segs;
+       struct bio_vec *bvec;
+       struct iov_iter iter;
+       struct file *file;
+       loff_t pos;
+
+       BUG_ON(pio->level > top_level(ploop));
+
+       pio->complete = data_rw_complete;
+
+       rw = (op_is_write(pio->bi_op) ? WRITE : READ);
+       nr_segs = pio_nr_segs(pio);
+       bvec = __bvec_iter_bvec(pio->bi_io_vec, pio->bi_iter);
+
+       iov_iter_bvec(&iter, rw, bvec, nr_segs, pio->bi_iter.bi_size);
+       iter.iov_offset = pio->bi_iter.bi_bvec_done;
+
+       pos = to_bytes(pio->bi_iter.bi_sector);
+
+       file = ploop->deltas[pio->level].file;
+
+       if (css)
+               kthread_associate_blkcg(pio->css);
+       /* Don't touch @pio after that */
+       ploop_call_rw_iter(file, pos, rw, &iter, pio);
+       if (css)
+               kthread_associate_blkcg(NULL);
+
+}
+
+void map_and_submit_rw(struct ploop *ploop, u32 dst_clu, struct pio *pio, u8 
level)
+{
+       remap_to_cluster(ploop, pio, dst_clu);
+       pio->level = level;
+
+       submit_rw_mapped(ploop, pio);
+}
+
+static void initiate_delta_read(struct ploop *ploop, unsigned int level,
+                               u32 dst_clu, struct pio *pio)
+{
+       if (dst_clu == BAT_ENTRY_NONE) {
+               /* No one delta contains dst_clu. */
+               zero_fill_pio(pio);
+               pio_endio(pio);
+               return;
+       }
+
+       map_and_submit_rw(ploop, dst_clu, pio, level);
+}
+
+static void ploop_cow_endio(struct pio *aux_pio, void *data, blk_status_t 
bi_status)
+{
+       struct ploop_cow *cow = data;
+       struct ploop *ploop = cow->ploop;
+
+       aux_pio->queue_list_id = PLOOP_LIST_COW;
+       dispatch_pios(ploop, aux_pio, NULL);
+}
+
+static bool postpone_if_cluster_locked(struct ploop *ploop, struct pio *pio, 
u32 clu)
+{
+       struct pio *e_h; /* Exclusively locked */
+
+       spin_lock_irq(&ploop->deferred_lock);
+       e_h = find_lk_of_cluster(ploop, clu);
+       if (e_h)
+               add_endio_pio(e_h, pio);
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       return e_h != NULL;
+}
+
+static int submit_cluster_cow(struct ploop *ploop, unsigned int level,
+                             u32 clu, u32 dst_clu, struct pio *cow_pio)
+{
+       struct ploop_cow *cow = NULL;
+       struct pio *aux_pio = NULL;
+
+       /* Prepare new delta read */
+       aux_pio = alloc_pio_with_pages(ploop);
+       cow = kmem_cache_alloc(cow_cache, GFP_NOIO);
+       if (!aux_pio || !cow)
+               goto err;
+       init_pio(ploop, REQ_OP_READ, aux_pio);
+       aux_pio->css = cow_pio->css;
+       pio_prepare_offsets(ploop, aux_pio, clu);
+       aux_pio->endio_cb = ploop_cow_endio;
+       aux_pio->endio_cb_data = cow;
+
+       cow->ploop = ploop;
+       cow->dst_clu = BAT_ENTRY_NONE;
+       cow->aux_pio = aux_pio;
+       cow->cow_pio = cow_pio;
+
+       add_cluster_lk(ploop, cow_pio, clu);
+
+       /* Stage #0: read secondary delta full clu */
+       map_and_submit_rw(ploop, dst_clu, aux_pio, level);
+       return 0;
+err:
+       if (aux_pio)
+               free_pio_with_pages(ploop, aux_pio);
+       kfree(cow);
+       return -ENOMEM;
+}
+
+static void initiate_cluster_cow(struct ploop *ploop, unsigned int level,
+                                u32 clu, u32 dst_clu, struct pio *pio)
+{
+       if (!submit_cluster_cow(ploop, level, clu, dst_clu, pio))
+               return;
+
+       pio->bi_status = BLK_STS_RESOURCE;
+       pio_endio(pio);
+}
+
+static void submit_cluster_write(struct ploop_cow *cow)
+{
+       struct pio *aux_pio = cow->aux_pio;
+       struct ploop *ploop = cow->ploop;
+       u32 dst_clu;
+       int ret;
+
+       ret = allocate_cluster(ploop, &dst_clu);
+       if (unlikely(ret < 0))
+               goto error;
+       cow->dst_clu = dst_clu;
+
+       init_pio(ploop, REQ_OP_WRITE, aux_pio);
+       aux_pio->css = cow->cow_pio->css;
+       pio_prepare_offsets(ploop, aux_pio, dst_clu);
+
+       BUG_ON(irqs_disabled());
+       aux_pio->endio_cb = ploop_cow_endio;
+       aux_pio->endio_cb_data = cow;
+
+       map_and_submit_rw(ploop, dst_clu, aux_pio, top_level(ploop));
+       return;
+error:
+       complete_cow(cow, errno_to_blk_status(ret));
+}
+
+static void submit_cow_index_wb(struct ploop_cow *cow)
+{
+       struct pio *cow_pio = cow->cow_pio;
+       struct ploop *ploop = cow->ploop;
+       u32 page_id, clu = cow_pio->clu;
+       struct ploop_index_wb *piwb;
+       struct md_page *md;
+       map_index_t *to;
+
+       WARN_ON_ONCE(cow->aux_pio->queue_list_id != PLOOP_LIST_COW);
+       page_id = bat_clu_to_page_nr(clu);
+       md = md_page_find(ploop, page_id);
+
+       if (delay_if_md_busy(ploop, md, PIWB_TYPE_ALLOC, cow->aux_pio))
+               goto out;
+
+       if (!(md->status & MD_DIRTY)) {
+               /* Unlocked, since MD_DIRTY is set and cleared from this work */
+               if (ploop_prepare_bat_update(ploop, md, PIWB_TYPE_ALLOC) < 0)
+                       goto err_resource;
+               ploop_md_make_dirty(ploop, md);
+       }
+
+       piwb = md->piwb;
+
+       clu -= page_id * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
+
+       to = kmap_atomic(piwb->bat_page);
+       WARN_ON(to[clu]);
+       to[clu] = cow->dst_clu;
+       kunmap_atomic(to);
+
+       /* Prevent double clearing of holes_bitmap bit on complete_cow() */
+       cow->dst_clu = BAT_ENTRY_NONE;
+       spin_lock_irq(&ploop->deferred_lock);
+       list_add_tail(&cow->aux_pio->list, &piwb->cow_list);
+       spin_unlock_irq(&ploop->deferred_lock);
+out:
+       return;
+err_resource:
+       complete_cow(cow, BLK_STS_RESOURCE);
+}
+
+static void process_delta_cow(struct ploop *ploop, struct list_head *cow_list)
+{
+       struct ploop_cow *cow;
+       struct pio *aux_pio;
+
+       if (list_empty(cow_list))
+               return;
+
+       while ((aux_pio = pio_list_pop(cow_list)) != NULL) {
+               cow = aux_pio->endio_cb_data;
+               if (unlikely(aux_pio->bi_status != BLK_STS_OK)) {
+                       complete_cow(cow, aux_pio->bi_status);
+                       continue;
+               }
+
+               if (cow->dst_clu == BAT_ENTRY_NONE) {
+                       /*
+                        * Stage #1: assign dst_clu and write data
+                        * to top delta.
+                        */
+                       submit_cluster_write(cow);
+               } else {
+                       /*
+                        * Stage #2: data is written to top delta.
+                        * Update index.
+                        */
+                       submit_cow_index_wb(cow);
+               }
+       }
+}
+
+/*
+ * This allocates a new clu (if clu wb is not pending yet),
+ * or tries to attach a bio to a planned page index wb.
+ *
+ * We want to update BAT indexes in batch, but we don't want to delay data
+ * bios submitting till the batch is assembled, submitted and completed.
+ * This function tries to submit data bios before indexes are written
+ * on disk.
+ * Original bio->bi_end_io mustn't be called before index wb is completed.
+ * We handle this in ploop_attach_end_action() by specific callback
+ * for ploop_data_pio_end().
+ * Note: clu newer becomes locked here, since index update is called
+ * synchronously. Keep in mind this in case you make it async.
+ */
+static bool locate_new_cluster_and_attach_pio(struct ploop *ploop,
+                                             struct md_page *md,
+                                             u32 clu, u32 *dst_clu,
+                                             struct pio *pio)
+{
+       bool bat_update_prepared = false;
+       struct ploop_index_wb *piwb;
+       bool attached = false;
+       u32 page_id;
+       int err;
+
+       WARN_ON_ONCE(pio->queue_list_id != PLOOP_LIST_DEFERRED);
+       if (delay_if_md_busy(ploop, md, PIWB_TYPE_ALLOC, pio))
+               goto out;
+
+       if (!(md->status & MD_DIRTY)) {
+                /* Unlocked since MD_DIRTY is set and cleared from this work */
+               page_id = bat_clu_to_page_nr(clu);
+               if (ploop_prepare_bat_update(ploop, md, PIWB_TYPE_ALLOC) < 0) {
+                       pio->bi_status = BLK_STS_RESOURCE;
+                       goto error;
+               }
+               bat_update_prepared = true;
+       }
+
+       piwb = md->piwb;
+
+       err = ploop_alloc_cluster(ploop, piwb, clu, dst_clu);
+       if (err) {
+               pio->bi_status = errno_to_blk_status(err);
+               goto error;
+       }
+
+       if (bat_update_prepared)
+               ploop_md_make_dirty(ploop, md);
+
+       ploop_attach_end_action(pio, piwb);
+       attached = true;
+out:
+       return attached;
+error:
+       /* Uninit piwb */
+       if (bat_update_prepared)
+               ploop_break_bat_update(ploop, md);
+       pio_endio(pio);
+       return false;
+}
+
+static int process_one_deferred_bio(struct ploop *ploop, struct pio *pio)
+{
+       sector_t sector = pio->bi_iter.bi_sector;
+       struct md_page *md;
+       u32 clu, dst_clu;
+       u8 level;
+       bool ret;
+
+       /*
+        * Unlocked, since no one can update BAT in parallel:
+        * we update BAT only 1)from *this* kwork, and 2)from
+        * ploop_advance_local_after_bat_wb(), which we start
+        * and wait synchronously from *this* kwork.
+        */
+       clu = SEC_TO_CLU(ploop, sector);
+       dst_clu = ploop_bat_entries(ploop, clu, &level, &md);
+
+       if (postpone_if_cluster_locked(ploop, pio, clu))
+               goto out;
+
+       if (op_is_discard(pio->bi_op)) {
+               handle_discard_pio(ploop, pio, clu, dst_clu);
+               goto out;
+       }
+
+       if (cluster_is_in_top_delta(ploop, clu)) {
+               /* Already mapped */
+               if (pio_endio_if_merge_fake_pio(pio))
+                       goto out;
+               goto queue;
+       } else if (!op_is_write(pio->bi_op)) {
+               /*
+                * Simple read from secondary delta. May fail.
+                * (Also handles the case dst_clu == BAT_ENTRY_NONE).
+                */
+               initiate_delta_read(ploop, level, dst_clu, pio);
+               goto out;
+       } else if (dst_clu != BAT_ENTRY_NONE) {
+               /*
+                * Read secondary delta and write to top delta. May fail.
+                * Yes, we can optimize the whole-clu-write case and
+                * a lot of other corner cases, but we don't do that as
+                * snapshots are used and COW occurs very rare.
+                */
+               initiate_cluster_cow(ploop, level, clu, dst_clu, pio);
+               goto out;
+       }
+
+       if (unlikely(pio_endio_if_all_zeros(pio)))
+               goto out;
+
+       /* Cluster exists nowhere. Allocate it and setup pio as outrunning */
+       ret = locate_new_cluster_and_attach_pio(ploop, md, clu, &dst_clu, pio);
+       if (!ret)
+               goto out;
+queue:
+       link_submitting_pio(ploop, pio, clu);
+
+       map_and_submit_rw(ploop, dst_clu, pio, top_level(ploop));
+out:
+       return 0;
+}
+
+static void md_fsync_endio(struct pio *pio, void *piwb_ptr, blk_status_t 
bi_status)
+{
+       struct ploop_index_wb *piwb = piwb_ptr;
+
+       ploop_bat_write_complete(piwb, bi_status);
+}
+
+static void md_write_endio(struct pio *pio, void *piwb_ptr, blk_status_t 
bi_status)
+{
+       struct ploop_index_wb *piwb = piwb_ptr;
+       struct ploop *ploop = piwb->ploop;
+       u32 dst_clu;
+
+       dst_clu = POS_TO_CLU(ploop, (u64)piwb->page_id << PAGE_SHIFT);
+       track_dst_cluster(ploop, dst_clu);
+
+       if (bi_status) {
+               md_fsync_endio(pio, piwb, bi_status);
+       } else {
+               init_pio(ploop, REQ_OP_FLUSH, pio);
+               pio->endio_cb = md_fsync_endio;
+               pio->endio_cb_data = piwb;
+
+               pio->queue_list_id = PLOOP_LIST_FLUSH;
+               dispatch_pios(ploop, pio, NULL);
+       }
+}
+
+void ploop_index_wb_submit(struct ploop *ploop, struct ploop_index_wb *piwb)
+{
+       loff_t pos = (loff_t)piwb->page_id << PAGE_SHIFT;
+       struct pio *pio = piwb->pio;
+       struct bio_vec bvec = {
+               .bv_page = piwb->bat_page,
+               .bv_len = PAGE_SIZE,
+               .bv_offset = 0,
+       };
+
+       pio->bi_iter.bi_sector = to_sector(pos);
+       pio->bi_iter.bi_size = PAGE_SIZE;
+       pio->bi_iter.bi_idx = 0;
+       pio->bi_iter.bi_bvec_done = 0;
+       pio->bi_io_vec = &bvec;
+       pio->level = top_level(ploop);
+       pio->endio_cb = md_write_endio;
+       pio->endio_cb_data = piwb;
+
+       submit_rw_mapped(ploop, pio);
+}
+
+static struct bio_vec *create_bvec_from_rq(struct request *rq)
+{
+       struct bio_vec bv, *bvec, *tmp;
+       struct req_iterator rq_iter;
+       unsigned int nr_bvec = 0;
+
+       rq_for_each_bvec(bv, rq, rq_iter)
+               nr_bvec++;
+
+       bvec = kmalloc_array(nr_bvec, sizeof(struct bio_vec),
+                            GFP_NOIO);
+       if (!bvec)
+               goto out;
+
+       tmp = bvec;
+       rq_for_each_bvec(bv, rq, rq_iter) {
+               *tmp = bv;
+               tmp++;
+       }
+out:
+       return bvec;
+}
+
+static void prepare_one_embedded_pio(struct ploop *ploop, struct pio *pio,
+                                    struct list_head *deferred_pios)
+{
+       struct ploop_rq *prq = pio->endio_cb_data;
+       struct request *rq = prq->rq;
+       struct bio_vec *bvec = NULL;
+       LIST_HEAD(list);
+       int ret;
+
+       if (rq->bio != rq->biotail) {
+               if (req_op(rq) == REQ_OP_DISCARD)
+                       goto skip_bvec;
+               /*
+                * Transform a set of bvec arrays related to bios
+                * into a single bvec array (which we can iterate).
+                */
+               bvec = create_bvec_from_rq(rq);
+               if (!bvec)
+                       goto err_nomem;
+               prq->bvec = bvec;
+skip_bvec:
+               pio->bi_iter.bi_sector = blk_rq_pos(rq);
+               pio->bi_iter.bi_size = blk_rq_bytes(rq);
+               pio->bi_iter.bi_idx = 0;
+               pio->bi_iter.bi_bvec_done = 0;
+       } else {
+               /* Single bio already provides bvec array */
+               bvec = rq->bio->bi_io_vec;
+
+               pio->bi_iter = rq->bio->bi_iter;
+       }
+       pio->bi_io_vec = bvec;
+
+       pio->queue_list_id = PLOOP_LIST_DEFERRED;
+       ret = split_pio_to_list(ploop, pio, deferred_pios);
+       if (ret)
+               goto err_nomem;
+
+       return;
+err_nomem:
+       pio->bi_status = BLK_STS_IOERR;
+       pio_endio(pio);
+}
+
+static void prepare_embedded_pios(struct ploop *ploop, struct list_head *pios,
+                                 struct list_head *deferred_pios)
+{
+       struct pio *pio;
+
+       while ((pio = pio_list_pop(pios)) != NULL)
+               prepare_one_embedded_pio(ploop, pio, deferred_pios);
+}
+
+static void process_deferred_pios(struct ploop *ploop, struct list_head *pios)
+{
+       struct pio *pio;
+
+       while ((pio = pio_list_pop(pios)) != NULL)
+               process_one_deferred_bio(ploop, pio);
+}
+
+static void process_one_discard_pio(struct ploop *ploop, struct pio *pio)
+{
+       bool bat_update_prepared = false;
+       u32 page_id, clu = pio->clu;
+       struct ploop_index_wb *piwb;
+       struct md_page *md;
+       map_index_t *to;
+
+       WARN_ON(ploop->nr_deltas != 1 ||
+               pio->queue_list_id != PLOOP_LIST_DISCARD);
+
+       page_id = bat_clu_to_page_nr(clu);
+       md = md_page_find(ploop, page_id);
+       if (delay_if_md_busy(ploop, md, PIWB_TYPE_DISCARD, pio))
+               goto out;
+
+       if (!(md->status & MD_DIRTY)) {
+                /* Unlocked since MD_DIRTY is set and cleared from this work */
+               if (ploop_prepare_bat_update(ploop, md, PIWB_TYPE_DISCARD) < 0) 
{
+                       pio->bi_status = BLK_STS_RESOURCE;
+                       goto err;
+               }
+               bat_update_prepared = true;
+       }
+
+       piwb = md->piwb;
+
+       /* Cluster index related to the page[page_id] start */
+       clu -= piwb->page_id * PAGE_SIZE / sizeof(map_index_t) - 
PLOOP_MAP_OFFSET;
+
+       to = kmap_atomic(piwb->bat_page);
+       if (WARN_ON_ONCE(!to[clu])) {
+               pio->bi_status = BLK_STS_IOERR;
+               goto err;
+       } else {
+               to[clu] = 0;
+               list_add_tail(&pio->list, &piwb->ready_data_pios);
+       }
+       kunmap_atomic(to);
+
+       if (bat_update_prepared)
+               ploop_md_make_dirty(ploop, md);
+out:
+       return;
+err:
+       if (bat_update_prepared)
+               ploop_break_bat_update(ploop, md);
+       pio_endio(pio);
+}
+
+static void process_discard_pios(struct ploop *ploop, struct list_head *pios)
+{
+       struct pio *pio;
+
+       while ((pio = pio_list_pop(pios)) != NULL)
+               process_one_discard_pio(ploop, pio);
+}
+
+static void process_resubmit_pios(struct ploop *ploop, struct list_head *pios)
+{
+       struct pio *pio;
+
+       while ((pio = pio_list_pop(pios)) != NULL) {
+               pio->queue_list_id = PLOOP_LIST_INVALID;
+               submit_rw_mapped(ploop, pio);
+       }
+}
+
+static void submit_metadata_writeback(struct ploop *ploop)
+{
+       struct md_page *md;
+
+       while (1) {
+               write_lock_irq(&ploop->bat_rwlock);
+               md = list_first_entry_or_null(&ploop->wb_batch_list,
+                               struct md_page, wb_link);
+               if (!md) {
+                       write_unlock_irq(&ploop->bat_rwlock);
+                       break;
+               }
+               list_del_init(&md->wb_link);
+               /* L1L2 mustn't be redirtyed, when wb in-flight! */
+               WARN_ON_ONCE(!(md->status & MD_DIRTY) ||
+                            (md->status & MD_WRITEBACK));
+               md->status |= MD_WRITEBACK;
+               md->status &= ~MD_DIRTY;
+               write_unlock_irq(&ploop->bat_rwlock);
+
+               ploop_index_wb_submit(ploop, md->piwb);
+       }
+}
+
+void do_ploop_work(struct work_struct *ws)
+{
+       struct ploop *ploop = container_of(ws, struct ploop, worker);
+       LIST_HEAD(embedded_pios);
+       LIST_HEAD(deferred_pios);
+       LIST_HEAD(discard_pios);
+       LIST_HEAD(cow_pios);
+       LIST_HEAD(resubmit_pios);
+       unsigned int pf_io_thread = (current->flags & PF_IO_THREAD);
+
+       current->flags |= PF_IO_THREAD;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       list_splice_init(&ploop->pios[PLOOP_LIST_PREPARE], &embedded_pios);
+       list_splice_init(&ploop->pios[PLOOP_LIST_DEFERRED], &deferred_pios);
+       list_splice_init(&ploop->pios[PLOOP_LIST_DISCARD], &discard_pios);
+       list_splice_init(&ploop->pios[PLOOP_LIST_COW], &cow_pios);
+       list_splice_init(&ploop->resubmit_pios, &resubmit_pios);
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       prepare_embedded_pios(ploop, &embedded_pios, &deferred_pios);
+
+       process_resubmit_pios(ploop, &resubmit_pios);
+       process_deferred_pios(ploop, &deferred_pios);
+       process_discard_pios(ploop, &discard_pios);
+       process_delta_cow(ploop, &cow_pios);
+
+       submit_metadata_writeback(ploop);
+
+       current->flags = (current->flags & ~PF_IO_THREAD) | pf_io_thread;
+}
+
+void do_ploop_fsync_work(struct work_struct *ws)
+{
+       struct ploop *ploop = container_of(ws, struct ploop, fsync_worker);
+       LIST_HEAD(flush_pios);
+       struct file *file;
+       struct pio *pio;
+       int ret;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       list_splice_init(&ploop->pios[PLOOP_LIST_FLUSH], &flush_pios);
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       file = top_delta(ploop)->file;
+       ret = vfs_fsync(file, 0);
+
+       while ((pio = pio_list_pop(&flush_pios)) != NULL) {
+               if (unlikely(ret))
+                       pio->bi_status = errno_to_blk_status(ret);
+               pio_endio(pio);
+       }
+}
+
+static void submit_embedded_pio(struct ploop *ploop, struct pio *pio)
+{
+       struct ploop_rq *prq = pio->endio_cb_data;
+       struct request *rq = prq->rq;
+       struct work_struct *worker;
+       unsigned long flags;
+       bool queue = true;
+
+       if (blk_rq_bytes(rq)) {
+               pio->queue_list_id = PLOOP_LIST_PREPARE;
+               worker = &ploop->worker;
+       } else {
+               WARN_ON_ONCE(pio->bi_op != REQ_OP_FLUSH);
+               pio->queue_list_id = PLOOP_LIST_FLUSH;
+               worker = &ploop->fsync_worker;
+       }
+
+       spin_lock_irqsave(&ploop->deferred_lock, flags);
+       if (unlikely(ploop->stop_submitting_pios)) {
+               list_add_tail(&pio->list, &ploop->suspended_pios);
+               queue = false;
+               goto unlock;
+       }
+
+       inc_nr_inflight(ploop, pio);
+       list_add_tail(&pio->list, &ploop->pios[pio->queue_list_id]);
+unlock:
+       spin_unlock_irqrestore(&ploop->deferred_lock, flags);
+
+       if (queue)
+               queue_work(ploop->wq, worker);
+}
+
+void submit_embedded_pios(struct ploop *ploop, struct list_head *list)
+{
+       struct pio *pio;
+
+       while ((pio = pio_list_pop(list)) != NULL)
+               submit_embedded_pio(ploop, pio);
+}
+
+int ploop_clone_and_map(struct dm_target *ti, struct request *rq,
+                   union map_info *info, struct request **clone)
+{
+       struct ploop *ploop = ti->private;
+       struct ploop_rq *prq;
+       struct pio *pio;
+
+       if (blk_rq_bytes(rq) && ploop_rq_valid(ploop, rq) < 0)
+               return DM_MAPIO_KILL;
+
+       prq = mempool_alloc(ploop->prq_pool, GFP_ATOMIC);
+       if (!prq)
+               return DM_MAPIO_KILL;
+       pio = (void *)prq + sizeof(*prq);
+
+       init_prq_and_embedded_pio(ploop, rq, prq, pio);
+
+       submit_embedded_pio(ploop, pio);
+       return DM_MAPIO_SUBMITTED;
+}
+
+static void handle_cleanup(struct ploop *ploop, struct pio *pio)
+{
+       /*
+        * This function is called from the very beginning
+        * of call_bio_endio().
+        */
+       if (pio->wants_discard_index_cleanup)
+               ploop_discard_index_pio_end(ploop, pio);
+
+       unlink_completed_pio(ploop, pio);
+       dec_nr_inflight(ploop, pio);
+}
+
+/*
+ * Prepare simple index writeback without attached data bios.
+ * In case of @dst_clu is passed, this tryes to allocate
+ * another index instead of existing. If so, management of
+ * old bat_entries[@clu] and of related holes_bitmap bit
+ * is caller duty.
+ */
+int ploop_prepare_reloc_index_wb(struct ploop *ploop,
+                                struct md_page **ret_md,
+                                u32 clu, u32 *dst_clu)
+{
+       enum piwb_type type = PIWB_TYPE_ALLOC;
+       u32 page_id = bat_clu_to_page_nr(clu);
+       struct md_page *md = md_page_find(ploop, page_id);
+       struct ploop_index_wb *piwb;
+       int err;
+
+       if (dst_clu)
+               type = PIWB_TYPE_RELOC;
+
+       if ((md->status & (MD_DIRTY|MD_WRITEBACK)) ||
+           ploop_prepare_bat_update(ploop, md, type)) {
+               err = -EIO;
+               goto out_error;
+       }
+
+       piwb = md->piwb;
+
+       if (dst_clu) {
+               /*
+                * For ploop_advance_local_after_bat_wb(): do not concern
+                * about bat_cluster[@clu] is set. Zero bat_page[@clu],
+                * to make ploop_alloc_cluster() allocate new dst_clu from
+                * holes_bitmap.
+                */
+               ploop_bat_page_zero_cluster(ploop, piwb, clu);
+               err = ploop_alloc_cluster(ploop, piwb, clu, dst_clu);
+               if (err)
+                       goto out_reset;
+       }
+
+       *ret_md = md;
+       return 0;
+
+out_reset:
+       ploop_break_bat_update(ploop, md);
+out_error:
+       return errno_to_blk_status(err);
+}
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
new file mode 100644
index 000000000000..ec0efddef2ac
--- /dev/null
+++ b/drivers/md/dm-ploop-target.c
@@ -0,0 +1,573 @@
+// SPDX-License-Identifier: GPL-2.0-only
+
+/*
+ *  drivers/md/dm-ploop-target.c
+ *
+ *  Copyright (c) 2020-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#include "dm.h"
+#include <linux/buffer_head.h>
+#include <linux/rbtree.h>
+#include <linux/dm-io.h>
+#include <linux/dm-kcopyd.h>
+#include <linux/init.h>
+#include <linux/module.h>
+#include <linux/file.h>
+#include <linux/slab.h>
+#include <linux/vmalloc.h>
+#include <linux/uio.h>
+#include "dm-ploop.h"
+
+#define DM_MSG_PREFIX "ploop"
+
+bool ignore_signature_disk_in_use = false; /* For development purposes */
+module_param(ignore_signature_disk_in_use, bool, 0444);
+MODULE_PARM_DESC(ignore_signature_disk_in_use,
+                "Does not check for SIGNATURE_DISK_IN_USE");
+
+static struct kmem_cache *prq_cache;
+static struct kmem_cache *pio_cache;
+struct kmem_cache *cow_cache;
+
+static void ploop_aio_do_completion(struct pio *pio)
+{
+       if (!atomic_dec_and_test(&pio->aio_ref))
+               return;
+       pio->complete(pio);
+}
+
+static void ploop_aio_complete(struct kiocb *iocb, long ret, long ret2)
+{
+       struct pio *pio;
+
+       pio = container_of(iocb, struct pio, iocb);
+
+       WARN_ON_ONCE(ret > INT_MAX);
+       pio->ret = (int)ret;
+       ploop_aio_do_completion(pio);
+}
+
+void ploop_call_rw_iter(struct file *file, loff_t pos, unsigned rw,
+                       struct iov_iter *iter, struct pio *pio)
+{
+       struct kiocb *iocb = &pio->iocb;
+       int ret;
+
+       iocb->ki_pos = pos;
+       iocb->ki_filp = file;
+       iocb->ki_complete = ploop_aio_complete;
+       iocb->ki_flags = IOCB_DIRECT;
+       iocb->ki_ioprio = IOPRIO_PRIO_VALUE(IOPRIO_CLASS_NONE, 0);
+
+       atomic_set(&pio->aio_ref, 2);
+
+       if (rw == WRITE)
+               ret = call_write_iter(file, iocb, iter);
+       else
+               ret = call_read_iter(file, iocb, iter);
+
+       ploop_aio_do_completion(pio);
+
+       if (ret != -EIOCBQUEUED)
+               iocb->ki_complete(iocb, ret, 0);
+}
+
+int ploop_rw_page_sync(unsigned rw, struct file *file,
+                      u64 index, struct page *page)
+{
+       struct bio_vec *bvec, bvec_on_stack;
+       struct iov_iter iter;
+       ssize_t ret;
+       loff_t pos;
+
+       BUG_ON(rw != READ && rw != WRITE);
+
+       bvec = &bvec_on_stack;
+       bvec->bv_page = page;
+       bvec->bv_len = PAGE_SIZE;
+       bvec->bv_offset = 0;
+
+       iov_iter_bvec(&iter, rw, bvec, 1, PAGE_SIZE);
+       pos = index << PAGE_SHIFT;
+
+       if (rw == READ)
+               ret = vfs_iter_read(file, &iter, &pos, 0);
+       else
+               ret = vfs_iter_write(file, &iter, &pos, 0);
+
+       if (ret == PAGE_SIZE)
+               ret = 0;
+       else if (ret >= 0)
+               ret = -ENODATA;
+
+       return ret;
+}
+
+static void inflight_bios_ref_exit0(struct percpu_ref *ref)
+{
+       struct ploop *ploop = container_of(ref, struct ploop,
+                                          inflight_bios_ref[0]);
+       complete(&ploop->inflight_bios_ref_comp);
+}
+
+static void inflight_bios_ref_exit1(struct percpu_ref *ref)
+{
+       struct ploop *ploop = container_of(ref, struct ploop,
+                                          inflight_bios_ref[1]);
+       complete(&ploop->inflight_bios_ref_comp);
+}
+
+void free_md_pages_tree(struct rb_root *root)
+{
+       struct rb_node *node;
+       struct md_page *md;
+
+       while ((node = root->rb_node) != NULL) {
+               md = rb_entry(node, struct md_page, node);
+               rb_erase(node, root);
+               ploop_free_md_page(md);
+       }
+}
+
+static bool ploop_has_pending_activity(struct ploop *ploop)
+{
+       bool has = false;
+       int i;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       for (i = 0; i < PLOOP_LIST_COUNT; i++)
+               has |= !list_empty(&ploop->pios[i]);
+       spin_unlock_irq(&ploop->deferred_lock);
+
+       return has;
+}
+
+static bool ploop_empty_htable(struct hlist_head head[])
+{
+       int i;
+
+       for (i = 0; i < PLOOP_HASH_TABLE_SIZE; i++)
+               if (!hlist_empty(&head[i]))
+                       return false;
+
+       return true;
+}
+
+static void ploop_destroy(struct ploop *ploop)
+{
+       int i;
+
+       if (ploop->wq) {
+               flush_workqueue(ploop->wq);
+               destroy_workqueue(ploop->wq);
+               WARN_ON_ONCE(ploop_has_pending_activity(ploop));
+       }
+       for (i = 0; i < 2; i++)
+               percpu_ref_exit(&ploop->inflight_bios_ref[i]);
+       /* Nobody uses it after destroy_workqueue() */
+       while (ploop->nr_deltas-- > 0) {
+               if (ploop->deltas[ploop->nr_deltas].file)
+                       fput(ploop->deltas[ploop->nr_deltas].file);
+       }
+       WARN_ON(!ploop_empty_htable(ploop->exclusive_pios));
+       WARN_ON(!ploop_empty_htable(ploop->inflight_pios));
+       kfree(ploop->inflight_pios);
+       kfree(ploop->exclusive_pios);
+       mempool_destroy(ploop->pio_pool);
+       mempool_destroy(ploop->prq_pool);
+       kfree(ploop->deltas);
+       kvfree(ploop->holes_bitmap);
+       kvfree(ploop->tracking_bitmap);
+       free_md_pages_tree(&ploop->bat_entries);
+       kfree(ploop);
+}
+
+static struct file * get_delta_file(int fd)
+{
+       struct file *file;
+
+       file = fget(fd);
+       if (!file)
+               return ERR_PTR(-ENOENT);
+       if (!(file->f_mode & FMODE_READ)) {
+               fput(file);
+               return ERR_PTR(-EBADF);
+       }
+
+       return file;
+}
+
+static int check_top_delta(struct ploop *ploop, struct file *file)
+{
+       struct dm_target *ti = ploop->ti;
+       struct page *page = NULL;
+       fmode_t mode;
+       int ret;
+
+       mode = dm_table_get_mode(ti->table);
+       mode &= (FMODE_READ|FMODE_WRITE);
+
+       ret = -EACCES;
+        if (mode & ~(file->f_mode & (FMODE_READ|FMODE_WRITE)))
+               goto out;
+
+       /* Prealloc a page to read hdr */
+       ret = -ENOMEM;
+       page = alloc_page(GFP_KERNEL);
+       if (!page)
+               goto out;
+
+       ret = ploop_rw_page_sync(READ, file, 0, page);
+       if (ret < 0)
+               goto out;
+
+       ret = ploop_setup_metadata(ploop, page);
+       if (ret)
+               goto out;
+
+       ret = prealloc_md_pages(&ploop->bat_entries, 0, ploop->nr_bat_entries);
+       if (ret)
+               goto out;
+out:
+       if (page)
+               put_page(page);
+       return ret;
+}
+
+static int ploop_add_deltas_stack(struct ploop *ploop, char **argv, int argc)
+{
+       struct ploop_delta *deltas;
+       int i, delta_fd, ret;
+       struct file *file;
+       const char *arg;
+       bool is_raw;
+
+       ret = -EINVAL;
+       if (argc < 1)
+               goto out;
+       if (argc > BAT_LEVEL_MAX - 1)
+               goto out;
+
+       ret = -ENOMEM;
+       deltas = kcalloc(argc, sizeof(*deltas), GFP_KERNEL);
+       if (!deltas)
+               goto out;
+       ploop->deltas = deltas;
+       ploop->nr_deltas = argc;
+
+       for (i = argc - 1; i >= 0; i--) {
+               ret = -EINVAL;
+               arg = argv[i];
+               is_raw = false;
+               if (strncmp(arg, "raw@", 4) == 0) {
+                       if (i != 0)
+                               goto out;
+                       arg += 4;
+                       is_raw = true;
+               }
+               if (kstrtos32(arg, 10, &delta_fd) < 0)
+                       goto out;
+
+               file = get_delta_file(delta_fd);
+               if (IS_ERR(file)) {
+                       ret = PTR_ERR(file);
+                       goto out;
+               }
+
+               if (i == argc - 1) { /* Top delta */
+                       ret = check_top_delta(ploop, file);
+                       if (ret)
+                               goto err_fput;
+               }
+
+               ret = ploop_add_delta(ploop, i, file, is_raw);
+               if (ret < 0)
+                       goto err_fput;
+       }
+
+       ret = 0;
+out:
+       return ret;
+err_fput:
+       fput(file);
+       goto out;
+}
+
+#define EAT_ARG(argc, argv)                                    \
+       do {                                                    \
+               BUILD_BUG_ON(sizeof(argc) != sizeof(int));      \
+               argc--;                                         \
+               argv++;                                         \
+       } while (0);
+/*
+ * <data dev>
+ */
+static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv)
+{
+       percpu_ref_func_t *release;
+       struct ploop *ploop;
+       unsigned int flags;
+       int i, ret;
+
+       if (argc < 2)
+               return -EINVAL;
+
+       ploop = kzalloc(sizeof(*ploop), GFP_KERNEL);
+       if (!ploop)
+               return -ENOMEM;
+
+       ploop->prq_pool = mempool_create_slab_pool(PLOOP_PRQ_POOL_SIZE,
+                                                  prq_cache);
+       ploop->pio_pool = mempool_create_slab_pool(PLOOP_PIO_POOL_SIZE,
+                                                  pio_cache);
+       ploop->exclusive_pios = kcalloc(PLOOP_HASH_TABLE_SIZE,
+                                       sizeof(struct hlist_head),
+                                       GFP_KERNEL);
+       ploop->inflight_pios = kcalloc(PLOOP_HASH_TABLE_SIZE,
+                                       sizeof(struct hlist_head),
+                                       GFP_KERNEL);
+       if (!ploop->prq_pool || !ploop->pio_pool ||
+           !ploop->exclusive_pios || !ploop->inflight_pios) {
+               ret = -ENOMEM;
+               goto err;
+       }
+
+       rwlock_init(&ploop->bat_rwlock);
+       spin_lock_init(&ploop->err_status_lock);
+       init_rwsem(&ploop->ctl_rwsem);
+       init_waitqueue_head(&ploop->service_wq);
+       spin_lock_init(&ploop->inflight_lock);
+       spin_lock_init(&ploop->deferred_lock);
+
+       INIT_LIST_HEAD(&ploop->suspended_pios);
+
+       for (i = 0; i < PLOOP_LIST_COUNT; i++)
+               INIT_LIST_HEAD(&ploop->pios[i]);
+
+       INIT_LIST_HEAD(&ploop->resubmit_pios);
+       INIT_LIST_HEAD(&ploop->enospc_pios);
+       INIT_LIST_HEAD(&ploop->cluster_lk_list);
+       INIT_LIST_HEAD(&ploop->wb_batch_list);
+       ploop->bat_entries = RB_ROOT;
+       timer_setup(&ploop->enospc_timer, ploop_enospc_timer, 0);
+
+       INIT_WORK(&ploop->worker, do_ploop_work);
+       INIT_WORK(&ploop->fsync_worker, do_ploop_fsync_work);
+       INIT_WORK(&ploop->event_work, ploop_event_work);
+       init_completion(&ploop->inflight_bios_ref_comp);
+
+       for (i = 0; i < 2; i++) {
+               release = i ? inflight_bios_ref_exit1 : inflight_bios_ref_exit0;
+               if (percpu_ref_init(&ploop->inflight_bios_ref[i], release,
+                                   PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
+                       ret = -ENOMEM;
+                       goto err;
+               }
+       }
+
+       flags = WQ_MEM_RECLAIM|WQ_HIGHPRI|WQ_UNBOUND;
+       ploop->wq = alloc_workqueue("dm-" DM_MSG_PREFIX, flags, 0);
+       if (!ploop->wq) {
+               ret = -ENOMEM;
+               goto err;
+       }
+
+       ti->private = ploop;
+       ploop->ti = ti;
+
+       if (kstrtou32(argv[0], 10, &ploop->cluster_log) < 0) {
+               ret = -EINVAL;
+               ti->error = "could not parse cluster_log";
+               goto err;
+       }
+       EAT_ARG(argc, argv);
+       ret = dm_set_target_max_io_len(ti, CLU_TO_SEC(ploop, 1));
+       if (ret) {
+               ti->error = "could not set max_io_len";
+               goto err;
+       }
+
+       /* Optional parameter */
+       if (strcmp(argv[0], "falloc_new_clu") == 0) {
+               if (argc < 2) {
+                       ret = -EINVAL;
+                       goto err;
+               }
+               ploop->falloc_new_clu = true;
+               EAT_ARG(argc, argv);
+       }
+
+       ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
+       if (ret)
+               goto err;
+
+       ti->num_flush_bios = 1;
+       ti->flush_supported = true;
+       ti->num_discard_bios = 1;
+       ti->discards_supported = true;
+       return 0;
+
+err:
+       ploop_destroy(ploop);
+       return ret;
+}
+
+static void ploop_dtr(struct dm_target *ti)
+{
+       ploop_destroy(ti->private);
+}
+
+static void ploop_io_hints(struct dm_target *ti, struct queue_limits *limits)
+{
+       struct ploop *ploop = ti->private;
+
+       limits->max_discard_sectors = CLU_TO_SEC(ploop, 1);
+       limits->max_hw_discard_sectors = CLU_TO_SEC(ploop, 1);
+       limits->discard_granularity = CLU_SIZE(ploop);
+       limits->discard_alignment = 0;
+       limits->discard_misaligned = 0;
+}
+
+static void ploop_status(struct dm_target *ti, status_type_t type,
+                        unsigned int status_flags, char *result,
+                        unsigned int maxlen)
+{
+       struct ploop *ploop = ti->private;
+       char stat[16] = { 0 }, *p = stat;
+       ssize_t sz = 0;
+
+       down_read(&ploop->ctl_rwsem);
+       if (ploop->falloc_new_clu)
+               p += sprintf(p, "f");
+       if (ploop->tracking_bitmap)
+               p += sprintf(p, "t");
+       if (READ_ONCE(ploop->noresume))
+               p += sprintf(p, "n");
+       if (p == stat)
+               p += sprintf(p, "o");
+       up_read(&ploop->ctl_rwsem);
+
+       BUG_ON(p - stat >= sizeof(stat));
+       DMEMIT("%u v2 %u %s", ploop->nr_deltas, (u32)CLU_TO_SEC(ploop, 1), 
stat);
+}
+
+static void ploop_set_wants_suspend(struct dm_target *ti, bool wants)
+{
+       struct ploop *ploop = ti->private;
+
+       spin_lock_irq(&ploop->deferred_lock);
+       ploop->wants_suspend = wants;
+       spin_unlock_irq(&ploop->deferred_lock);
+}
+static void ploop_set_suspended(struct dm_target *ti, bool suspended)
+{
+       struct ploop *ploop = ti->private;
+
+       down_write(&ploop->ctl_rwsem);
+       ploop->suspended = suspended;
+       up_write(&ploop->ctl_rwsem);
+}
+
+static void ploop_presuspend(struct dm_target *ti)
+{
+       struct ploop *ploop = ti->private;
+       /*
+        * For pending enospc requests. Otherwise,
+        * we may never be able to suspend this target.
+        */
+       ploop_set_wants_suspend(ti, true);
+       flush_work(&ploop->event_work);
+       del_timer_sync(&ploop->enospc_timer);
+       ploop_enospc_timer(&ploop->enospc_timer);
+}
+static void ploop_presuspend_undo(struct dm_target *ti)
+{
+       ploop_set_wants_suspend(ti, false);
+}
+static void ploop_postsuspend(struct dm_target *ti)
+{
+       ploop_set_suspended(ti, true);
+}
+static int ploop_preresume(struct dm_target *ti)
+{
+       struct ploop *ploop = ti->private;
+       int ret = 0;
+
+       down_read(&ploop->ctl_rwsem);
+       if (ploop->noresume)
+               ret = -EAGAIN;
+       up_read(&ploop->ctl_rwsem);
+
+       if (ret == 0) {
+               /*
+                * We are singleton target. There will be
+                * no more reasons to break resume.
+                */
+               ploop_set_suspended(ti, false);
+               ploop_set_wants_suspend(ti, false);
+       }
+       return ret;
+}
+
+/*----------------------------------------------------------------*/
+
+static struct target_type ploop_target = {
+       .name = "ploop",
+       .version = {1, 0, 0},
+       .features = DM_TARGET_SINGLETON,
+       .module = THIS_MODULE,
+       .ctr = ploop_ctr,
+       .dtr = ploop_dtr,
+       .message = ploop_message,
+       .io_hints = ploop_io_hints,
+       .presuspend = ploop_presuspend,
+       .presuspend_undo = ploop_presuspend_undo,
+       .postsuspend = ploop_postsuspend,
+       .preresume = ploop_preresume,
+       .clone_and_map_rq = ploop_clone_and_map,
+       .status = ploop_status,
+};
+
+static int __init dm_ploop_init(void)
+{
+       int r = -ENOMEM;
+
+       /* This saves some memory in comparison with kmalloc memcache */
+       prq_cache = kmem_cache_create("ploop-prq", sizeof(struct ploop_rq) +
+                                     sizeof(struct pio), 0, 0, NULL);
+       pio_cache = kmem_cache_create("ploop-pio", sizeof(struct pio),
+                                     0, 0, NULL);
+       cow_cache = kmem_cache_create("ploop-cow", sizeof(struct ploop_cow),
+                                     0, 0, NULL);
+       if (!prq_cache || !pio_cache || !cow_cache)
+               goto err;
+
+       r = dm_register_target(&ploop_target);
+       if (r) {
+               DMERR("ploop target registration failed: %d", r);
+               goto err;
+       }
+
+       return 0;
+err:
+       kmem_cache_destroy(prq_cache);
+       kmem_cache_destroy(pio_cache);
+       kmem_cache_destroy(cow_cache);
+       return r;
+}
+
+static void __exit dm_ploop_exit(void)
+{
+       dm_unregister_target(&ploop_target);
+       kmem_cache_destroy(prq_cache);
+       kmem_cache_destroy(pio_cache);
+       kmem_cache_destroy(cow_cache);
+}
+
+module_init(dm_ploop_init);
+module_exit(dm_ploop_exit);
+
+MODULE_AUTHOR("Kirill Tkhai <ktk...@virtuozzo.com>");
+MODULE_LICENSE("GPL");
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
new file mode 100644
index 000000000000..8de2a28b2dec
--- /dev/null
+++ b/drivers/md/dm-ploop.h
@@ -0,0 +1,581 @@
+/*
+ *  drivers/md/dm-ploop.h
+ *
+ *  Copyright (c) 2020-2021 Virtuozzo International GmbH. All rights reserved.
+ *
+ */
+
+#ifndef __DM_PLOOP_H
+#define __DM_PLOOP_H
+
+#include <linux/device-mapper.h>
+#include <linux/bio.h>
+
+#define PLOOP_MAP_OFFSET 16
+typedef u32 map_index_t;
+#define BAT_ENTRIES_PER_PAGE (PAGE_SIZE / sizeof(map_index_t))
+
+#define SIGNATURE_DISK_IN_USE           0x746F6E59
+
+#pragma pack(push, 1)
+struct ploop_pvd_header {
+       __u8  m_Sig[16];        /* Signature */
+       __u32 m_Type;           /* Disk type */
+       __u32 m_Heads;          /* heads count */
+       __u32 m_Cylinders;      /* tracks count */
+       __u32 m_Sectors;        /* Sectors per track count */
+       __u32 m_Size;           /* Size of disk in tracks */
+       union {                 /* Size of disk in 512-byte sectors */
+               struct {
+                       __u32 m_SizeInSectors_v1;
+                       __u32 Unused;
+               };
+               __u64 m_SizeInSectors_v2;
+       };
+       __u32 m_DiskInUse;      /* Disk in use */
+       __u32 m_FirstBlockOffset; /* First data block offset (in sectors) */
+       __u32 m_Flags;          /* Misc flags */
+       __u8  m_Reserved[8];    /* Reserved */
+};
+#pragma pack(pop)
+
+struct ploop_delta {
+       struct file *file;
+       loff_t file_size;
+       loff_t file_preallocated_area_start;
+       u32 size_in_clus;
+       bool is_raw;
+};
+
+#define MERGE_PIOS_MAX                 64
+
+struct ploop_cmd {
+       union {
+               struct {
+                       sector_t new_sectors;
+                       /* Preallocated data */
+                       struct rb_root md_pages_root;
+                       struct md_page *md0;
+                       void *holes_bitmap;
+#define PLOOP_GROW_STAGE_INITIAL       0
+                       unsigned int stage;
+                       unsigned int nr_bat_entries;
+                       unsigned int hb_nr;
+                       unsigned int end_dst_clu;
+                       unsigned int nr_old_bat_clu;
+                       unsigned int clu, dst_clu;
+                       struct pio *pio;
+               } resize;
+       };
+};
+
+#define PAGE_NR_NONE           U32_MAX
+/* We can't use 0 for unmapped clusters, since RAW image references 0 clu */
+#define BAT_ENTRY_NONE         U32_MAX
+
+#define PLOOP_INFLIGHT_TIMEOUT (60 * HZ)
+#define PLOOP_ENOSPC_TIMEOUT   (20 * HZ)
+
+#define PLOOP_BIOS_HTABLE_BITS 8
+#define PLOOP_BIOS_HTABLE_SIZE (1 << PLOOP_BIOS_HTABLE_BITS)
+#define CLU_OFF(ploop, pos) (pos & (to_bytes(1 << ploop->cluster_log) - 1))
+#define CLU_TO_POS(ploop, clu) to_bytes((loff_t)clu << ploop->cluster_log)
+#define POS_TO_CLU(ploop, pos) (to_sector(pos) >> ploop->cluster_log)
+#define SEC_TO_CLU(ploop, sec) (sec >> ploop->cluster_log)
+#define CLU_TO_SEC(ploop, clu) ((sector_t)clu << ploop->cluster_log)
+#define CLU_SIZE(ploop) to_bytes((u32)1 << ploop->cluster_log)
+
+enum piwb_type {
+       PIWB_TYPE_ALLOC = 0,    /* Allocation of new clusters */
+       PIWB_TYPE_RELOC,        /* Relocation of clu (on BAT grow) */
+       PIWB_TYPE_DISCARD,      /* Zeroing index on discard */
+};
+
+struct ploop_index_wb {
+       struct ploop *ploop;
+       struct completion *comp;
+       blk_status_t *comp_bi_status;
+       enum piwb_type type;
+       spinlock_t lock;
+       struct md_page *md;
+       struct pio *pio;
+       struct page *bat_page;
+       struct list_head ready_data_pios;
+       struct list_head cow_list;
+       atomic_t count;
+       bool completed;
+       blk_status_t bi_status;
+       u32 page_id;
+};
+
+/* Metadata page */
+struct md_page {
+       struct rb_node node;
+       u32 id; /* Number of this page starting from hdr */
+#define MD_DIRTY       (1U << 1) /* Page contains changes and wants writeback 
*/
+#define MD_WRITEBACK   (1U << 2) /* Writeback was submitted */
+       unsigned int status;
+       struct page *page;
+       u8 *bat_levels;
+       struct list_head wait_list;
+
+       struct list_head wb_link;
+       struct ploop_index_wb *piwb;
+};
+
+enum {
+       PLOOP_LIST_PREPARE = 0, /* List for initial preparation and splitting
+                                * embedded pios related to prq */
+       PLOOP_LIST_DEFERRED,
+       PLOOP_LIST_FLUSH,
+       PLOOP_LIST_DISCARD,
+       PLOOP_LIST_COW,
+
+       PLOOP_LIST_COUNT,
+       PLOOP_LIST_INVALID = PLOOP_LIST_COUNT,
+};
+
+struct ploop {
+       struct dm_target *ti;
+#define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() 
*/
+       mempool_t *prq_pool;
+#define PLOOP_PIO_POOL_SIZE 256
+       mempool_t *pio_pool;
+
+       struct rb_root bat_entries;
+       struct ploop_delta *deltas;
+       u8 nr_deltas;
+       bool falloc_new_clu; /* fallocate() instead of truncate() */
+       u32 nr_bat_entries;
+       unsigned int cluster_log; /* In sectors */
+
+       u8 m_Sig[16]; /* Signature */
+       u32 m_Type; /* Disk type */
+       u32 m_Sectors; /* Sectors per clu */
+
+       /*
+        * Absolute values from start of file. BAT-related clusters
+        * are also included, and their bits must be zeroed.
+        */
+       void *holes_bitmap; /* Clearing a bit occurs from kwork only */
+       u32 hb_nr; /* holes_bitmap size in bits */
+       rwlock_t bat_rwlock;
+
+       struct list_head wb_batch_list;
+
+       void *tracking_bitmap;
+       unsigned int tb_nr; /* tracking_bitmap size in bits */
+       unsigned int tb_cursor;
+
+       /*
+        * Hash table to link non-exclusive submitted bios.
+        * This is needed for discard to check, nobody uses
+        * the discarding clu.
+        */
+       struct hlist_head *inflight_pios;
+       /*
+        * Hash table to link exclusive submitted bios.
+        * This allows to delay bios going in some clu.
+        */
+       struct hlist_head *exclusive_pios;
+
+       struct workqueue_struct *wq;
+       struct work_struct worker;
+       struct work_struct fsync_worker;
+       struct work_struct event_work;
+
+       struct completion inflight_bios_ref_comp;
+       struct percpu_ref inflight_bios_ref[2];
+       bool inflight_ref_comp_pending;
+       unsigned int inflight_bios_ref_index:1;
+
+       struct list_head suspended_pios;
+       bool stop_submitting_pios;
+
+       spinlock_t inflight_lock;
+       spinlock_t deferred_lock;
+
+       struct list_head pios[PLOOP_LIST_COUNT];
+
+       struct list_head resubmit_pios; /* After partial IO */
+       struct list_head enospc_pios; /* Delayed after ENOSPC */
+
+       atomic_t service_pios;
+       struct wait_queue_head service_wq;
+
+       spinlock_t err_status_lock;
+       struct rw_semaphore ctl_rwsem;
+
+       /*
+        * List of locked clusters (no write is possible).
+        * Make @cluster_lk_list hash table or smth like this.
+        */
+       struct list_head cluster_lk_list;
+
+       /* Resume is prohibited */
+       bool noresume;
+       /* Device is suspended */
+       bool suspended;
+       /* Device wants suspend */
+       bool wants_suspend;
+
+       /* Maintaince in process */
+       bool maintaince;
+
+       struct timer_list enospc_timer;
+       bool event_enospc;
+};
+
+struct ploop_rq {
+       struct request *rq;
+       struct bio_vec *bvec;
+       struct cgroup_subsys_state *css;
+};
+
+struct pio;
+typedef void (*ploop_endio_t)(struct pio *, void *, blk_status_t);
+
+struct pio {
+       struct ploop *ploop;
+       struct cgroup_subsys_state *css;
+
+       struct list_head list;
+       struct hlist_node hlist_node;
+       /* List of pios, which will be queued from this pio end */
+       struct list_head endio_list;
+
+       struct bvec_iter        bi_iter;
+       struct bio_vec          *bi_io_vec;
+       unsigned int            bi_op;
+       unsigned int            bi_vcnt;
+       blk_status_t bi_status;
+       atomic_t remaining;
+
+       ploop_endio_t endio_cb;
+       void *endio_cb_data;
+
+       u32 clu;
+       u8 level;
+
+       bool is_data_alloc:1;
+       bool wants_discard_index_cleanup:1;
+       bool is_fake_merge:1;
+       bool free_on_endio:1;
+       /*
+        * 0 and 1 are related to inflight_bios_ref[],
+        * 2 means index is not assigned.
+        */
+#define PLOOP_REF_INDEX_INVALID        2
+       unsigned int ref_index:2;
+
+       u8 queue_list_id:3; /* id in ploop->pios */
+
+       struct ploop_index_wb *piwb;
+
+       struct kiocb iocb;
+       atomic_t aio_ref;
+       int ret; /* iocb result */
+       void (*complete)(struct pio *me);
+       void *data;
+};
+
+/* Delta COW private */
+struct ploop_cow {
+       struct ploop *ploop;
+       struct pio *aux_pio;
+       u32 dst_clu;
+
+       struct pio *cow_pio;
+};
+
+extern bool ignore_signature_disk_in_use;
+extern struct kmem_cache *cow_cache;
+
+#define rb_root_for_each_md_page(rb_root, md, node)    \
+       for (node = rb_first(rb_root),                  \
+            md = rb_entry(node, struct md_page, node); \
+            node != NULL;                              \
+            node = rb_next(node),                      \
+            md = rb_entry(node, struct md_page, node))
+
+#define ploop_for_each_md_page(ploop, md, node)        \
+       rb_root_for_each_md_page(&ploop->bat_entries, md, node)
+
+static inline bool ploop_is_ro(struct ploop *ploop)
+{
+       return (dm_table_get_mode(ploop->ti->table) & FMODE_WRITE) == 0;
+}
+
+static inline void remap_to_cluster(struct ploop *ploop, struct pio *pio, u32 
clu)
+{
+       pio->bi_iter.bi_sector &= ((1 << ploop->cluster_log) - 1);
+       pio->bi_iter.bi_sector |= (clu << ploop->cluster_log);
+}
+
+static inline bool whole_cluster(struct ploop *ploop, struct pio *pio)
+{
+       sector_t end_sector = bvec_iter_end_sector(pio->bi_iter);
+
+       if (pio->bi_iter.bi_size != CLU_SIZE(ploop))
+               return false;
+       /*
+        * There is no sacral meaning in bio_end_sector(),
+        * it's just a suitable and existing primitive.
+        */
+       return !(end_sector & ((1 << ploop->cluster_log) - 1));
+}
+
+#define BAT_LEVEL_MAX          (U8_MAX - 1)
+#define BAT_LEVEL_INVALID      U8_MAX
+static inline u8 top_level(struct ploop *ploop)
+{
+       return ploop->nr_deltas - 1;
+}
+
+static inline struct ploop_delta *top_delta(struct ploop *ploop)
+{
+       return &ploop->deltas[top_level(ploop)];
+}
+
+static inline void ploop_hole_set_bit(unsigned long nr, struct ploop *ploop)
+{
+       if (!WARN_ON_ONCE(nr >= ploop->hb_nr))
+               set_bit(nr, ploop->holes_bitmap);
+}
+
+static inline void ploop_hole_clear_bit(u32 nr, struct ploop *ploop)
+{
+       if (!WARN_ON_ONCE(nr >= ploop->hb_nr))
+               clear_bit(nr, ploop->holes_bitmap);
+}
+
+static inline unsigned int nr_pages_in_cluster(struct ploop *ploop)
+{
+       return 1 << (ploop->cluster_log + 9 - PAGE_SHIFT);
+}
+
+/* Get number of clusters, occupied by hdr and BAT */
+static inline unsigned int ploop_nr_bat_clusters(struct ploop *ploop,
+                                                u32 nr_bat_entries)
+{
+       unsigned long size, bat_clusters;
+
+       size = (PLOOP_MAP_OFFSET + nr_bat_entries) * sizeof(map_index_t);
+       bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
+
+       return bat_clusters;
+}
+
+static inline u32 bat_clu_to_page_nr(u32 clu)
+{
+       u64 byte;
+
+       byte = (clu + PLOOP_MAP_OFFSET) * sizeof(map_index_t);
+       return byte >> PAGE_SHIFT;
+}
+
+static inline u32 bat_clu_idx_in_page(u32 clu)
+{
+       return (clu + PLOOP_MAP_OFFSET) % (PAGE_SIZE / sizeof(map_index_t));
+}
+
+static inline u32 page_clu_idx_to_bat_clu(u32 page_id, u32 cluster_rel)
+{
+       unsigned int off;
+       off = (u64)page_id * PAGE_SIZE / sizeof(map_index_t) - PLOOP_MAP_OFFSET;
+       return off + cluster_rel;
+}
+
+static inline struct md_page *md_first_entry(struct rb_root *md_root)
+{
+       struct rb_node *node = rb_first(md_root);
+       return rb_entry(node, struct md_page, node);
+}
+static inline struct md_page *md_next_entry(struct md_page *md)
+{
+       return rb_entry(rb_next(&md->node), struct md_page, node);
+}
+
+
+extern struct md_page * md_page_find(struct ploop *ploop, u32 id);
+
+/*
+ * This should be called in very rare cases. Avoid this function
+ * in cycles by clu, use ploop_for_each_md_page()-based
+ * iterations instead.
+ */
+static inline u32 ploop_bat_entries(struct ploop *ploop, u32 clu,
+                         u8 *bat_level, struct md_page **md_ret)
+{
+       u32 *bat_entries, dst_clu, id;
+       struct md_page *md;
+
+       id = bat_clu_to_page_nr(clu);
+       md = md_page_find(ploop, id);
+       BUG_ON(!md);
+
+       /* Cluster index related to the page[page_id] start */
+       clu = bat_clu_idx_in_page(clu);
+
+       if (bat_level)
+               *bat_level = md->bat_levels[clu];
+       if (md_ret)
+               *md_ret = md;
+
+       bat_entries = kmap_atomic(md->page);
+       dst_clu = bat_entries[clu];
+       kunmap_atomic(bat_entries);
+       return dst_clu;
+}
+
+static inline bool cluster_is_in_top_delta(struct ploop *ploop, u32 clu)
+{
+       u32 dst_clu;
+       u8 level;
+
+       if (WARN_ON(clu >= ploop->nr_bat_entries))
+               return false;
+       dst_clu = ploop_bat_entries(ploop, clu, &level, NULL);
+
+       if (dst_clu == BAT_ENTRY_NONE || level < top_level(ploop))
+               return false;
+       return true;
+}
+
+static inline bool md_page_cluster_is_in_top_delta(struct ploop *ploop,
+                                          struct md_page *md, u32 clu)
+{
+       u32 count, *bat_entries;
+       bool ret = true;
+
+       count = PAGE_SIZE / sizeof(map_index_t);
+       if ((clu + 1) * sizeof(u8) > ksize(md->bat_levels) ||
+           clu >= count) {
+               WARN_ONCE(1, "clu=%u count=%u\n", clu, count);
+               return false;
+       }
+
+       bat_entries = kmap_atomic(md->page);
+       if (bat_entries[clu] == BAT_ENTRY_NONE ||
+           md->bat_levels[clu] < top_level(ploop))
+               ret = false;
+       kunmap_atomic(bat_entries);
+       return ret;
+}
+
+static inline void init_be_iter(u32 nr_be, u32 page_id,
+                               u32 *start, u32 *end)
+{
+       u32 last_page = bat_clu_to_page_nr(nr_be - 1);
+       unsigned int count = PAGE_SIZE / sizeof(map_index_t);
+
+       *start = 0;
+       if (page_id == 0)
+               *start = PLOOP_MAP_OFFSET;
+
+       *end = count - 1;
+       if (page_id == last_page)
+               *end = ((nr_be + PLOOP_MAP_OFFSET) % count) - 1;
+}
+
+static inline void ploop_init_be_iter(struct ploop *ploop, u32 page_id,
+                                     u32 *start, u32 *end)
+{
+       init_be_iter(ploop->nr_bat_entries, page_id, start, end);
+}
+
+extern void __track_pio(struct ploop *ploop, struct pio *pio);
+
+static inline void track_pio(struct ploop *ploop, struct pio *pio)
+{
+       /* See comment in process_tracking_start() about visibility */
+       if (unlikely(ploop->tracking_bitmap))
+               __track_pio(ploop, pio);
+}
+
+extern struct pio *find_pio(struct hlist_head head[], u32 clu);
+
+extern int prealloc_md_pages(struct rb_root *root, u32 nr_bat_entries,
+                            u32 new_nr_bat_entries);
+
+static inline struct pio *bio_to_endio_hook(struct bio *bio)
+{
+       return dm_per_bio_data(bio, sizeof(struct pio));
+}
+
+static inline struct pio *pio_list_pop(struct list_head *pio_list)
+{
+       struct pio *pio;
+
+       pio = list_first_entry_or_null(pio_list, struct pio, list);
+       if (pio)
+               list_del_init(&pio->list);
+       return pio;
+}
+
+#define PLOOP_HASH_TABLE_BITS 5
+#define PLOOP_HASH_TABLE_SIZE (1 << PLOOP_HASH_TABLE_BITS)
+static inline struct hlist_head *ploop_htable_slot(struct hlist_head head[], 
u32 clu)
+{
+       return &head[hash_32(clu, PLOOP_HASH_TABLE_BITS)];
+}
+
+static inline bool fake_merge_pio(struct pio *pio)
+{
+       if (pio->is_fake_merge) {
+               WARN_ON_ONCE(pio->bi_iter.bi_size ||
+                            pio->bi_op != REQ_OP_WRITE);
+               return true;
+       }
+       return false;
+}
+
+static inline struct pio *alloc_pio(struct ploop *ploop, gfp_t flags)
+{
+       return mempool_alloc(ploop->pio_pool, flags);
+}
+
+static inline void free_pio(struct ploop *ploop, struct pio *pio)
+{
+       mempool_free(pio, ploop->pio_pool);
+}
+
+extern void md_page_insert(struct ploop *ploop, struct md_page *md);
+extern void ploop_free_md_page(struct md_page *md);
+extern void free_md_pages_tree(struct rb_root *root);
+extern bool try_update_bat_entry(struct ploop *ploop, u32 clu,
+                                u8 level, u32 dst_clu);
+
+extern int ploop_add_delta(struct ploop *ploop, u32 level, struct file *file, 
bool is_raw);
+extern int ploop_check_delta_length(struct ploop *ploop, struct file *file, 
loff_t *file_size);
+extern void submit_embedded_pios(struct ploop *ploop, struct list_head *list);
+extern void dispatch_pios(struct ploop *ploop, struct pio *pio, struct 
list_head *pio_list);
+extern void do_ploop_work(struct work_struct *ws);
+extern void do_ploop_fsync_work(struct work_struct *ws);
+extern void ploop_event_work(struct work_struct *work);
+extern int ploop_clone_and_map(struct dm_target *ti, struct request *rq,
+                   union map_info *map_context, struct request **clone);
+extern struct pio *find_lk_of_cluster(struct ploop *ploop, u32 clu);
+extern void init_pio(struct ploop *ploop, unsigned int bi_op, struct pio *pio);
+extern int ploop_rw_page_sync(unsigned rw, struct file *file,
+                             u64 index, struct page *page);
+extern void map_and_submit_rw(struct ploop *ploop, u32 dst_clu, struct pio 
*pio, u8 level);
+
+extern int ploop_prepare_reloc_index_wb(struct ploop *, struct md_page **, 
u32, u32 *);
+extern void ploop_break_bat_update(struct ploop *ploop, struct md_page *);
+extern void ploop_index_wb_submit(struct ploop *, struct ploop_index_wb *);
+extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv,
+                        char *result, unsigned int maxlen);
+
+extern struct pio * alloc_pio_with_pages(struct ploop *ploop);
+extern void free_pio_with_pages(struct ploop *ploop, struct pio *pio);
+extern void pio_prepare_offsets(struct ploop *, struct pio *, u32);
+
+extern int ploop_setup_metadata(struct ploop *ploop, struct page *page);
+extern int ploop_read_delta_metadata(struct ploop *ploop, struct file *file,
+                                    struct rb_root *md_root, u32 *delta_nr_be);
+extern void ploop_index_wb_init(struct ploop_index_wb *piwb, struct ploop 
*ploop);
+extern void ploop_call_rw_iter(struct file *file, loff_t pos, unsigned rw,
+                              struct iov_iter *iter, struct pio *pio);
+extern void ploop_enospc_timer(struct timer_list *timer);
+#endif /* __DM_PLOOP_H */
diff --git a/mm/vmscan.c b/mm/vmscan.c
index d880cbe9db84..41f05cf20599 100644
--- a/mm/vmscan.c
+++ b/mm/vmscan.c
@@ -3963,7 +3963,7 @@ static int balance_pgdat(pg_data_t *pgdat, int order, int 
highest_zoneidx)
 
                shrink_tcache(&sc);
                if (sc.nr_reclaimed >= sc.nr_to_reclaim &&
-                   pgdat_balanced(pgdat, order, classzone_idx))
+                   pgdat_balanced(pgdat, order, highest_zoneidx))
                        goto out;
 
 


_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to