The commit is pushed to "branch-rh7-3.10.0-327.18.2.vz7.14.x-ovz" and will 
appear at https://src.openvz.org/scm/ovz/vzkernel.git
after rh7-3.10.0-327.18.2.vz7.14.20
------>
commit 6a64ab2f267073beffc51d4206edd5b0bc37b19a
Author: Maxim Patlasov <[email protected]>
Date:   Mon Jun 27 11:11:23 2016 +0400

    ploop: push_backup: health monitor thread
    
    ploop_pb_health_monitor() is a stand-alone thread who responds on 
"expiration"
    event by calling ploop_pb_stop(). It lives as long as ploop_pushbackup_desc
    exists.
    
    The patch also introduces pbd->ppb_state with three possible values:
    
    enum {
        PLOOP_PB_ALIVE,
        PLOOP_PB_STOPPING,
        PLOOP_PB_DEAD,
    };
    
    PLOOP_PB_ALIVE is default, it means that everything is fine. PLOOP_PB_DEAD
    essentially means that ploop_pb_stop() was called. I.e. bitmask was merged
    back to CBT (unless userspace specify explicitly not to do it), and all
    ploop requests from pending/reported trees were forcibly returned to
    plo->ready queue. PLOOP_PB_STOPPING is a transitionary state: we decided to
    abort push_backup due to timeout, but ploop_pb_health_monitor() has not
    processed it yet.
    
    https://jira.sw.ru/browse/PSBM-48082
    
    Signed-off-by: Maxim Patlasov <[email protected]>
---
 drivers/block/ploop/push_backup.c | 86 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 85 insertions(+), 1 deletion(-)

diff --git a/drivers/block/ploop/push_backup.c 
b/drivers/block/ploop/push_backup.c
index 5058ec1..8fae402 100644
--- a/drivers/block/ploop/push_backup.c
+++ b/drivers/block/ploop/push_backup.c
@@ -31,6 +31,12 @@ struct pb_set {
        struct ploop_pushbackup_desc *pbd; /* points to parent pbd */
 };
 
+enum {
+       PLOOP_PB_ALIVE,
+       PLOOP_PB_STOPPING,
+       PLOOP_PB_DEAD,
+};
+
 struct ploop_pushbackup_desc {
        struct ploop_device *plo;
        struct page **cbt_map; /* a 'snapshot' copy of CBT mask */
@@ -48,6 +54,10 @@ struct ploop_pushbackup_desc {
 
        struct pb_set         pending_set;
        struct pb_set         reported_set;
+
+       struct task_struct   *health_monitor_thread;
+       wait_queue_head_t     ppb_waitq;
+       int                   ppb_state; /* see enum above */
 };
 
 int ploop_pb_check_uuid(struct ploop_pushbackup_desc *pbd, __u8 *uuid)
@@ -173,6 +183,7 @@ struct ploop_pushbackup_desc *ploop_pb_alloc(struct 
ploop_device *plo)
        init_completion(&pbd->ppb_comp);
        ploop_pbs_init(&pbd->pending_set, pbd, "pending");
        ploop_pbs_init(&pbd->reported_set, pbd, "reported");
+       init_waitqueue_head(&pbd->ppb_waitq);
        pbd->plo = plo;
 
        return pbd;
@@ -330,8 +341,42 @@ static int convert_map_to_map(struct ploop_pushbackup_desc 
*pbd)
 
 }
 
+static int ploop_pb_health_monitor(void * data)
+{
+       struct ploop_pushbackup_desc *pbd = data;
+       struct ploop_device          *plo = pbd->plo;
+
+       spin_lock(&pbd->ppb_lock);
+       while (!kthread_should_stop() || pbd->ppb_state == PLOOP_PB_STOPPING) {
+
+               DEFINE_WAIT(_wait);
+               for (;;) {
+                       prepare_to_wait(&pbd->ppb_waitq, &_wait, 
TASK_INTERRUPTIBLE);
+                       if (pbd->ppb_state == PLOOP_PB_STOPPING ||
+                           kthread_should_stop())
+                               break;
+
+                       spin_unlock(&pbd->ppb_lock);
+                       schedule();
+                       spin_lock(&pbd->ppb_lock);
+               }
+               finish_wait(&pbd->ppb_waitq, &_wait);
+
+               if (pbd->ppb_state == PLOOP_PB_STOPPING) {
+                       spin_unlock(&pbd->ppb_lock);
+                       mutex_lock(&plo->ctl_mutex);
+                       ploop_pb_stop(pbd, true);
+                       mutex_unlock(&plo->ctl_mutex);
+                       spin_lock(&pbd->ppb_lock);
+               }
+       }
+       spin_unlock(&pbd->ppb_lock);
+       return 0;
+}
+
 int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 *uuid, bool full)
 {
+       struct task_struct *ts;
        int rc;
 
        memcpy(pbd->cbt_uuid, uuid, sizeof(pbd->cbt_uuid));
@@ -359,7 +404,18 @@ int ploop_pb_init(struct ploop_pushbackup_desc *pbd, __u8 
*uuid, bool full)
        if (rc)
                return rc;
 
-       return convert_map_to_map(pbd);
+       rc = convert_map_to_map(pbd);
+       if (rc)
+               return rc;
+
+       ts = kthread_create(ploop_pb_health_monitor, pbd, "ploop_pb_hm%d",
+                           pbd->plo->index);
+       if (IS_ERR(ts))
+               return PTR_ERR(ts);
+
+       pbd->health_monitor_thread = ts;
+       wake_up_process(ts);
+       return 0;
 }
 
 void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
@@ -372,6 +428,11 @@ void ploop_pb_fini(struct ploop_pushbackup_desc *pbd)
        if (!RB_EMPTY_ROOT(&pbd->reported_set.tree))
                printk("ploop_pb_fini: reported_tree is not empty!\n");
 
+       if (pbd->health_monitor_thread) {
+               kthread_stop(pbd->health_monitor_thread);
+               pbd->health_monitor_thread = NULL;
+       }
+
        if (pbd->plo) {
                struct ploop_device *plo = pbd->plo;
                mutex_lock(&plo->sysfs_mutex);
@@ -552,6 +613,11 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc 
*pbd,
 
        spin_lock(&pbd->ppb_lock);
 
+       if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+               spin_unlock(&pbd->ppb_lock);
+               return -ESTALE;
+       }
+
        if (!test_bit(PLOOP_S_PUSH_BACKUP, &pbd->plo->state)) {
                spin_unlock(&pbd->ppb_lock);
                return -EINTR;
@@ -572,6 +638,7 @@ int ploop_pb_preq_add_pending(struct ploop_pushbackup_desc 
*pbd,
        return 0;
 }
 
+/* Always serialized by plo->ctl_mutex */
 unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd, bool do_merge)
 {
        unsigned long ret = 0;
@@ -581,6 +648,14 @@ unsigned long ploop_pb_stop(struct ploop_pushbackup_desc 
*pbd, bool do_merge)
        if (pbd == NULL)
                return 0;
 
+       spin_lock(&pbd->ppb_lock);
+       if (pbd->ppb_state == PLOOP_PB_DEAD) {
+               spin_unlock(&pbd->ppb_lock);
+               return 0;
+       }
+       pbd->ppb_state = PLOOP_PB_DEAD;
+       spin_unlock(&pbd->ppb_lock);
+
        ploop_pbs_fini(&pbd->pending_set);
        ploop_pbs_fini(&pbd->reported_set);
 
@@ -639,6 +714,10 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
                }
 
                 /* blocking case */
+               if (pbd->ppb_state != PLOOP_PB_ALIVE) {
+                       err = -ESTALE;
+                       goto get_pending_unlock;
+               }
                if (unlikely(pbd->ppb_waiting)) {
                        /* Other task is already waiting for event */
                        err = -EBUSY;
@@ -662,6 +741,8 @@ int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
                if (!preq) {
                        if (!test_bit(PLOOP_S_PUSH_BACKUP, &plo->state))
                                err = -EINTR;
+                       else if (pbd->ppb_state != PLOOP_PB_ALIVE)
+                               err =  -ESTALE;
                        else if (signal_pending(current))
                                err = -ERESTARTSYS;
                        else err = -ENOENT;
@@ -726,6 +807,9 @@ int ploop_pb_peek(struct ploop_pushbackup_desc *pbd,
        if (block >= pbd->ppb_block_max)
                return -ENOENT;
 
+       if (pbd->ppb_state != PLOOP_PB_ALIVE)
+               return -ESTALE;
+
        page = alloc_page(GFP_KERNEL);
        if (!page)
                return -ENOMEM;
_______________________________________________
Devel mailing list
[email protected]
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to