This patch uses the "live" glock and some new lvbs to signal when
a node has withdrawn from a file system. Nodes who see this try to
initiate journal recovery. When they withdraw, the need to flush the
metadata buffers and mark the file system read-only so journal
replay doesn't corrupt any data. They also need to avoid certain
operations if the withdraw comes from a journal operation.

Signed-off-by: Bob Peterson <[email protected]>
---
 fs/gfs2/glock.c    |  5 +--
 fs/gfs2/glops.c    | 47 ++++++++++++++++++++++++++
 fs/gfs2/incore.h   |  3 ++
 fs/gfs2/lock_dlm.c | 59 ++++++++++++++++++++++++++++++++
 fs/gfs2/log.c      | 62 ++++++++++++++++++----------------
 fs/gfs2/super.c    |  5 +--
 fs/gfs2/super.h    |  1 +
 fs/gfs2/util.c     | 84 ++++++++++++++++++++++++++++++++++++++++++++++
 fs/gfs2/util.h     | 13 +++++++
 9 files changed, 246 insertions(+), 33 deletions(-)

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 05431324b262..38a8a5eb8245 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -543,7 +543,7 @@ __acquires(&gl->gl_lockref.lock)
        int ret;
 
        if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)) &&
-           target != LM_ST_UNLOCKED)
+           gl->gl_ops != &gfs2_nondisk_glops && target != LM_ST_UNLOCKED)
                return;
        lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP |
                      LM_FLAG_PRIORITY);
@@ -1092,7 +1092,8 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        int error = 0;
 
-       if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
+       if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags) &&
+                    gl->gl_ops != &gfs2_nondisk_glops))
                return -EIO;
 
        if (test_bit(GLF_LRU, &gl->gl_flags))
diff --git a/fs/gfs2/glops.c b/fs/gfs2/glops.c
index c63bee9adb6a..6725bba87690 100644
--- a/fs/gfs2/glops.c
+++ b/fs/gfs2/glops.c
@@ -31,6 +31,8 @@
 
 struct workqueue_struct *gfs2_freeze_wq;
 
+extern struct workqueue_struct *gfs2_control_wq;
+
 static void gfs2_ail_error(struct gfs2_glock *gl, const struct buffer_head *bh)
 {
        fs_err(gl->gl_name.ln_sbd,
@@ -572,6 +574,49 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool 
remote)
        }
 }
 
+/**
+ * nondisk_go_callback - used to signal when a node did a withdraw
+ * @gl: the nondisk glock
+ * @remote: true if this came from a different cluster node
+ *
+ */
+static void nondisk_go_callback(struct gfs2_glock *gl, bool remote)
+{
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+       /* Ignore the callback unless it's from another node, and it's the
+          live lock. */
+       if (!remote || gl->gl_name.ln_number != GFS2_LIVE_LOCK)
+               return;
+
+       /* Ignore the unlock if we're withdrawn, unmounting, or in recovery. */
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags) ||
+           test_bit(SDF_SHUTDOWN, &sdp->sd_flags) ||
+           test_bit(GLF_IN_RECOVERY, &sdp->sd_flags))
+               return;
+
+       /* We only care when a node wants us to unlock, because that means
+        * they want a journal recovered. */
+       if (gl->gl_demote_state != LM_ST_UNLOCKED)
+               return;
+
+       if (sdp->sd_args.ar_spectator) {
+               fs_warn(sdp, "Spectator node cannot recover journals.\n");
+               return;
+       }
+
+       fs_warn(sdp, "Some node has withdrawn; checking for recovery.\n");
+       set_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+       /**
+        * We can't call remote_withdraw directly here or gfs2_recover_journal
+        * because this is called from the glock unlock function and the
+        * remote_withdraw needs to enqueue and dequeue the same "live" glock
+        * we were called from. So we queue it to the control work queue in
+        * lock_dlm.
+        */
+       queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
+}
+
 const struct gfs2_glock_operations gfs2_meta_glops = {
        .go_type = LM_TYPE_META,
 };
@@ -616,6 +661,8 @@ const struct gfs2_glock_operations gfs2_flock_glops = {
 
 const struct gfs2_glock_operations gfs2_nondisk_glops = {
        .go_type = LM_TYPE_NONDISK,
+       .go_callback = nondisk_go_callback,
+       .go_flags = GLOF_LVB,
 };
 
 const struct gfs2_glock_operations gfs2_quota_glops = {
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 888b62cfd6d1..259f95f15198 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -346,6 +346,7 @@ enum {
        GLF_OBJECT                      = 14, /* Used only for tracing */
        GLF_BLOCKING                    = 15,
        GLF_INODE_CREATING              = 16, /* Inode creation occurring */
+       GLF_IN_RECOVERY                 = 17, /* glock is in recovery */
 };
 
 struct gfs2_glock {
@@ -622,6 +623,8 @@ enum {
        SDF_SKIP_DLM_UNLOCK     = 8,
        SDF_FORCE_AIL_FLUSH     = 9,
        SDF_AIL1_IO_ERROR       = 10,
+       SDF_REMOTE_WITHDRAW     = 11, /* another node did withdraw */
+       SDF_JOURNAL_WITHDRAW    = 12, /* Was withdrawn from a log operation */
 };
 
 enum gfs2_freeze_state {
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index c75fe5544ffc..b442d84def8a 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -19,6 +19,8 @@
 
 #include "incore.h"
 #include "glock.h"
+#include "glops.h"
+#include "recovery.h"
 #include "util.h"
 #include "sys.h"
 #include "trace_gfs2.h"
@@ -325,6 +327,7 @@ static void gdlm_cancel(struct gfs2_glock *gl)
 /*
  * dlm/gfs2 recovery coordination using dlm_recover callbacks
  *
+ *  0. gfs2 checks for another cluster node withdraw, needing journal replay
  *  1. dlm_controld sees lockspace members change
  *  2. dlm_controld blocks dlm-kernel locking activity
  *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
@@ -573,6 +576,56 @@ static int control_lock(struct gfs2_sbd *sdp, int mode, 
uint32_t flags)
                         &ls->ls_control_lksb, "control_lock");
 }
 
+/**
+ * remote_withdraw - react to a node withdrawing from the file system
+ * @sdp: The superblock
+ */
+static void remote_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_jdesc *jd;
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       int ret, count = 0;
+
+       /* Dequeue the "live" glock, but keep a reference so it's never freed:
+        * LVBs only work on dlm conversions, not on new glocks. */
+       gfs2_glock_hold(gl);
+       set_bit(GLF_IN_RECOVERY, &sdp->sd_flags);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+
+       fs_info(sdp, "Reacquiring the live glock.\n");
+       /* Re-lock it EX to pick up the lvb bits */
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret) {
+               fs_err(sdp, "can't reacquire live glock EX: %d\n", ret);
+               goto out;
+       }
+
+       list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
+               fs_info(sdp, "Checking jid %d.\n", jd->jd_jid);
+               if (test_and_clear_bit_le(jd->jd_jid,
+                                         gl->gl_lksb.sb_lvbptr)) {
+                       count++;
+                       ret = gfs2_recover_journal(jd, true);
+                       if (ret)
+                               break;
+               }
+       }
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       /* Re-lock it SH: back to business as usual. */
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret)
+               fs_err(sdp, "can't reacquire live glock SH: %d\n", ret);
+out:
+       clear_bit(GLF_IN_RECOVERY, &sdp->sd_flags);
+       /* Now drop the additional reference we acquired */
+       fs_err(sdp, "%d journals recovered: ret = %d.\n", count, ret);
+       gfs2_glock_queue_put(gl);
+}
+
 static void gfs2_control_func(struct work_struct *work)
 {
        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, 
sd_control_work.work);
@@ -583,6 +636,12 @@ static void gfs2_control_func(struct work_struct *work)
        int recover_size;
        int i, error;
 
+       /* First check for other nodes that may have done a withdraw. */
+       if (test_and_clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
+               remote_withdraw(sdp);
+               return;
+       }
+
        spin_lock(&ls->ls_recover_spin);
        /*
         * No MOUNT_DONE means we're still mounting; control_mount()
diff --git a/fs/gfs2/log.c b/fs/gfs2/log.c
index 99dd58694ba1..90c0cc541c8b 100644
--- a/fs/gfs2/log.c
+++ b/fs/gfs2/log.c
@@ -92,8 +92,7 @@ static void gfs2_remove_from_ail(struct gfs2_bufdata *bd)
 
 static int gfs2_ail1_start_one(struct gfs2_sbd *sdp,
                               struct writeback_control *wbc,
-                              struct gfs2_trans *tr,
-                              bool *withdraw)
+                              struct gfs2_trans *tr)
 __releases(&sdp->sd_ail_lock)
 __acquires(&sdp->sd_ail_lock)
 {
@@ -110,10 +109,8 @@ __acquires(&sdp->sd_ail_lock)
                if (!buffer_busy(bh)) {
                        if (!buffer_uptodate(bh) &&
                            !test_and_set_bit(SDF_AIL1_IO_ERROR,
-                                             &sdp->sd_flags)) {
-                               gfs2_io_error_bh(sdp, bh);
-                               *withdraw = true;
-                       }
+                                             &sdp->sd_flags))
+                               gfs2_jrnl_io_error_bh(sdp, bh);
                        list_move(&bd->bd_ail_st_list, &tr->tr_ail2_list);
                        continue;
                }
@@ -153,7 +150,6 @@ void gfs2_ail1_flush(struct gfs2_sbd *sdp, struct 
writeback_control *wbc)
        struct list_head *head = &sdp->sd_ail1_list;
        struct gfs2_trans *tr;
        struct blk_plug plug;
-       bool withdraw = false;
 
        trace_gfs2_ail_flush(sdp, wbc, 1);
        blk_start_plug(&plug);
@@ -162,12 +158,15 @@ void gfs2_ail1_flush(struct gfs2_sbd *sdp, struct 
writeback_control *wbc)
        list_for_each_entry_reverse(tr, head, tr_list) {
                if (wbc->nr_to_write <= 0)
                        break;
-               if (gfs2_ail1_start_one(sdp, wbc, tr, &withdraw))
+               if (gfs2_ail1_start_one(sdp, wbc, tr))
                        goto restart;
        }
        spin_unlock(&sdp->sd_ail_lock);
        blk_finish_plug(&plug);
-       if (withdraw)
+       /* If gfs2_ail1_start_one sees an IO error or has an issue, it will
+        * set the JOURNAL_WITHDRAW bit but it won't withdraw. So we do that
+        * now after the whole list has been processed. */
+       if (test_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
                gfs2_lm_withdraw(sdp, NULL);
        trace_gfs2_ail_flush(sdp, wbc, 0);
 }
@@ -196,8 +195,7 @@ static void gfs2_ail1_start(struct gfs2_sbd *sdp)
  *
  */
 
-static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, struct gfs2_trans *tr,
-                               bool *withdraw)
+static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, struct gfs2_trans *tr)
 {
        struct gfs2_bufdata *bd, *s;
        struct buffer_head *bh;
@@ -210,8 +208,7 @@ static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, 
struct gfs2_trans *tr,
                        continue;
                if (!buffer_uptodate(bh) &&
                    !test_and_set_bit(SDF_AIL1_IO_ERROR, &sdp->sd_flags)) {
-                       gfs2_io_error_bh(sdp, bh);
-                       *withdraw = true;
+                       gfs2_jrnl_io_error_bh(sdp, bh);
                }
                list_move(&bd->bd_ail_st_list, &tr->tr_ail2_list);
        }
@@ -229,11 +226,10 @@ static int gfs2_ail1_empty(struct gfs2_sbd *sdp)
        struct gfs2_trans *tr, *s;
        int oldest_tr = 1;
        int ret;
-       bool withdraw = false;
 
        spin_lock(&sdp->sd_ail_lock);
        list_for_each_entry_safe_reverse(tr, s, &sdp->sd_ail1_list, tr_list) {
-               gfs2_ail1_empty_one(sdp, tr, &withdraw);
+               gfs2_ail1_empty_one(sdp, tr);
                if (list_empty(&tr->tr_ail1_list) && oldest_tr)
                        list_move(&tr->tr_list, &sdp->sd_ail2_list);
                else
@@ -242,7 +238,7 @@ static int gfs2_ail1_empty(struct gfs2_sbd *sdp)
        ret = list_empty(&sdp->sd_ail1_list);
        spin_unlock(&sdp->sd_ail_lock);
 
-       if (withdraw)
+       if (test_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
                gfs2_lm_withdraw(sdp, "fatal: I/O error(s)\n");
 
        return ret;
@@ -509,8 +505,8 @@ static void log_pull_tail(struct gfs2_sbd *sdp, unsigned 
int new_tail)
 
        atomic_add(dist, &sdp->sd_log_blks_free);
        trace_gfs2_log_blocks(sdp, dist);
-       gfs2_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
-                            sdp->sd_jdesc->jd_blocks);
+       gfs2_jrnl_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
+                                 sdp->sd_jdesc->jd_blocks);
 
        sdp->sd_log_tail = new_tail;
 }
@@ -683,12 +679,18 @@ void gfs2_write_log_header(struct gfs2_sbd *sdp, struct 
gfs2_jdesc *jd,
 {
        struct gfs2_log_header *lh;
        u32 hash, crc;
-       struct page *page = mempool_alloc(gfs2_page_pool, GFP_NOIO);
+       struct page *page;
        struct gfs2_statfs_change_host *l_sc = &sdp->sd_statfs_local;
        struct timespec64 tv;
        struct super_block *sb = sdp->sd_vfs;
        u64 addr;
 
+       if (test_bit(SDF_SHUTDOWN, &sdp->sd_flags) ||
+           test_bit(SDF_AIL1_IO_ERROR, &sdp->sd_flags)) {
+               log_flush_wait(sdp);
+               return;
+       }
+       page = mempool_alloc(gfs2_page_pool, GFP_NOIO);
        lh = page_address(page);
        clear_page(lh);
 
@@ -801,12 +803,13 @@ void gfs2_log_flush(struct gfs2_sbd *sdp, struct 
gfs2_glock *gl, u32 flags)
                INIT_LIST_HEAD(&tr->tr_ail2_list);
                tr->tr_first = sdp->sd_log_flush_head;
                if (unlikely (state == SFS_FROZEN))
-                       gfs2_assert_withdraw(sdp, !tr->tr_num_buf_new && 
!tr->tr_num_databuf_new);
+                       gfs2_jrnl_assert_withdraw(sdp, !tr->tr_num_buf_new &&
+                                                 !tr->tr_num_databuf_new);
        }
 
        if (unlikely(state == SFS_FROZEN))
-               gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
-       gfs2_assert_withdraw(sdp,
+               gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
+       gfs2_jrnl_assert_withdraw(sdp,
                        sdp->sd_log_num_revoke == sdp->sd_log_commited_revoke);
 
        gfs2_ordered_write(sdp);
@@ -894,7 +897,8 @@ static void log_refund(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
        if (sdp->sd_log_tr) {
                gfs2_merge_trans(sdp->sd_log_tr, tr);
        } else if (tr->tr_num_buf_new || tr->tr_num_databuf_new) {
-               gfs2_assert_withdraw(sdp, test_bit(TR_ALLOCED, &tr->tr_flags));
+               gfs2_jrnl_assert_withdraw(sdp, test_bit(TR_ALLOCED,
+                                                       &tr->tr_flags));
                sdp->sd_log_tr = tr;
                set_bit(TR_ATTACHED, &tr->tr_flags);
        }
@@ -902,12 +906,12 @@ static void log_refund(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
        sdp->sd_log_commited_revoke += tr->tr_num_revoke - tr->tr_num_revoke_rm;
        reserved = calc_reserved(sdp);
        maxres = sdp->sd_log_blks_reserved + tr->tr_reserved;
-       gfs2_assert_withdraw(sdp, maxres >= reserved);
+       gfs2_jrnl_assert_withdraw(sdp, maxres >= reserved);
        unused = maxres - reserved;
        atomic_add(unused, &sdp->sd_log_blks_free);
        trace_gfs2_log_blocks(sdp, unused);
-       gfs2_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
-                            sdp->sd_jdesc->jd_blocks);
+       gfs2_jrnl_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
+                                 sdp->sd_jdesc->jd_blocks);
        sdp->sd_log_blks_reserved = reserved;
 
        gfs2_log_unlock(sdp);
@@ -946,9 +950,9 @@ void gfs2_log_commit(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
 
 void gfs2_log_shutdown(struct gfs2_sbd *sdp)
 {
-       gfs2_assert_withdraw(sdp, !sdp->sd_log_blks_reserved);
-       gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
-       gfs2_assert_withdraw(sdp, list_empty(&sdp->sd_ail1_list));
+       gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_blks_reserved);
+       gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
+       gfs2_jrnl_assert_withdraw(sdp, list_empty(&sdp->sd_ail1_list));
 
        sdp->sd_log_flush_head = sdp->sd_log_head;
 
diff --git a/fs/gfs2/super.c b/fs/gfs2/super.c
index ca71163ff7cf..040e96e01231 100644
--- a/fs/gfs2/super.c
+++ b/fs/gfs2/super.c
@@ -844,7 +844,7 @@ static void gfs2_dirty_inode(struct inode *inode, int flags)
  * Returns: errno
  */
 
-static int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
+int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
 {
        struct gfs2_holder freeze_gh;
        int error;
@@ -922,7 +922,8 @@ static void gfs2_put_super(struct super_block *sb)
 
        if (!sdp->sd_args.ar_spectator) {
                gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
-               gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+               if (gfs2_holder_initialized(&sdp->sd_jinode_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
                gfs2_glock_dq_uninit(&sdp->sd_sc_gh);
                gfs2_glock_dq_uninit(&sdp->sd_qc_gh);
                iput(sdp->sd_sc_inode);
diff --git a/fs/gfs2/super.h b/fs/gfs2/super.h
index 73c97dccae21..aa401752da15 100644
--- a/fs/gfs2/super.h
+++ b/fs/gfs2/super.h
@@ -46,6 +46,7 @@ extern void update_statfs(struct gfs2_sbd *sdp, struct 
buffer_head *m_bh,
                          struct buffer_head *l_bh);
 extern int gfs2_statfs_sync(struct super_block *sb, int type);
 extern void gfs2_freeze_func(struct work_struct *work);
+extern int gfs2_make_fs_ro(struct gfs2_sbd *sdp);
 
 extern struct file_system_type gfs2_fs_type;
 extern struct file_system_type gfs2meta_fs_type;
diff --git a/fs/gfs2/util.c b/fs/gfs2/util.c
index 0a814ccac41d..0e3753175c68 100644
--- a/fs/gfs2/util.c
+++ b/fs/gfs2/util.c
@@ -14,12 +14,15 @@
 #include <linux/buffer_head.h>
 #include <linux/crc32.h>
 #include <linux/gfs2_ondisk.h>
+#include <linux/delay.h>
 #include <linux/uaccess.h>
 
 #include "gfs2.h"
 #include "incore.h"
 #include "glock.h"
+#include "log.h"
 #include "rgrp.h"
+#include "super.h"
 #include "util.h"
 
 struct kmem_cache *gfs2_glock_cachep __read_mostly;
@@ -36,6 +39,69 @@ void gfs2_assert_i(struct gfs2_sbd *sdp)
        fs_emerg(sdp, "fatal assertion failed\n");
 }
 
+static void signal_our_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       int ret;
+
+       /**
+        * Don't tell dlm we're bailing until we have no more buffers in the
+        * wind. If journal had an IO error, the log code should just purge
+        * the outstanding buffers rather than submitting new IO. Making the
+        * file system read-only will flush the journal, etc.
+        */
+       fs_err(sdp, "Making the file system read-only.\n");
+       gfs2_make_fs_ro(sdp);
+       sdp->sd_vfs->s_flags |= MS_RDONLY;
+
+       /* Drop the glock for our journal so another node can recover it. */
+       fs_err(sdp, "Dropping our journal glock to allow recovery.\n");
+       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+
+       /**
+        * Dequeue the "live" glock, but keep a reference so it's never freed:
+        * LVBs only work on dlm conversions, not on new glocks.
+        */
+       gfs2_glock_hold(gl);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /**
+        * We enqueue the "live" glock in EX so that all other nodes
+        * get a demote request and act on it, demoting their glock
+        * from SHARED to UNLOCKED. Once we have the glock in EX, we
+        * know all other nodes have been informed of our departure.
+        * They cannot do anything more until our journal has been
+        * replayed and our locks released.
+        *
+        * We set a bit in the glock's lvb indicating which journal
+        * needs to be replayed, but we can't recover it ourselves
+        * because the withdraw might have been due to an io error, etc.
+        */
+       fs_warn(sdp, "Requesting recovery of jid %d.\n",
+               sdp->sd_lockstruct.ls_jid);
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret) {
+               fs_err(sdp, "can't get live glock EX: %d\n", ret);
+               return;
+       }
+       __set_bit_le(sdp->sd_lockstruct.ls_jid, gl->gl_lksb.sb_lvbptr);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /* Here we sleep so that the other node can grab the lvb we wrote when
+        * the glock is demoted to UN. If we go too fast, dlm might just
+        * do a conversion from EX to SH which seems to destroy the lvb. */
+       msleep(100);
+       /* Drop the EX lock so the lvb gets written. Then reacquire in SH. */
+       gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret)
+               fs_err(sdp, "Can't reacquire live glock SH: %d\n", ret);
+
+       /* Now drop the additional reference we acquired */
+       gfs2_glock_queue_put(gl);
+}
+
 int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, ...)
 {
        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
@@ -62,6 +128,8 @@ int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, 
...)
                fs_err(sdp, "about to withdraw this file system\n");
                BUG_ON(sdp->sd_args.ar_debug);
 
+               signal_our_withdraw(sdp);
+
                kobject_uevent(&sdp->sd_kobj, KOBJ_OFFLINE);
 
                if (!strcmp(sdp->sd_lockstruct.ls_ops->lm_proto_name, 
"lock_dlm"))
@@ -100,6 +168,15 @@ int gfs2_assert_withdraw_i(struct gfs2_sbd *sdp, char 
*assertion,
        return (me) ? -1 : -2;
 }
 
+int gfs2_assert_withdraw_j(struct gfs2_sbd *sdp, char *assertion,
+                          const char *function, char *file, unsigned int line)
+{
+       if (!test_and_set_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
+               return gfs2_assert_withdraw_i(sdp, assertion, function, file,
+                                             line);
+       return -2;
+}
+
 /**
  * gfs2_assert_warn_i - Print a message to the console if @assertion is false
  * Returns: -1 if we printed something
@@ -270,3 +347,10 @@ void gfs2_io_error_bh_i(struct gfs2_sbd *sdp, struct 
buffer_head *bh,
                gfs2_lm_withdraw(sdp, NULL);
 }
 
+void gfs2_jrnl_io_error_bh_i(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                            const char *function, char *file,
+                            unsigned int line, bool withdraw)
+{
+       if (!test_and_set_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
+               gfs2_io_error_bh_i(sdp, bh, function, file, line, withdraw);
+}
diff --git a/fs/gfs2/util.h b/fs/gfs2/util.h
index 9278fecba632..336ad543efe1 100644
--- a/fs/gfs2/util.h
+++ b/fs/gfs2/util.h
@@ -41,11 +41,17 @@ do { \
 
 int gfs2_assert_withdraw_i(struct gfs2_sbd *sdp, char *assertion,
                           const char *function, char *file, unsigned int line);
+int gfs2_assert_withdraw_j(struct gfs2_sbd *sdp, char *assertion,
+                          const char *function, char *file, unsigned int line);
 
 #define gfs2_assert_withdraw(sdp, assertion) \
 ((likely(assertion)) ? 0 : gfs2_assert_withdraw_i((sdp), #assertion, \
                                        __func__, __FILE__, __LINE__))
 
+#define gfs2_jrnl_assert_withdraw(sdp, assertion) \
+((likely(assertion)) ? 0 : gfs2_assert_withdraw_j((sdp), #assertion, \
+                                       __func__, __FILE__, __LINE__))
+
 
 int gfs2_assert_warn_i(struct gfs2_sbd *sdp, char *assertion,
                       const char *function, char *file, unsigned int line);
@@ -140,12 +146,19 @@ void gfs2_io_error_bh_i(struct gfs2_sbd *sdp, struct 
buffer_head *bh,
                        const char *function, char *file, unsigned int line,
                        bool withdraw);
 
+void gfs2_jrnl_io_error_bh_i(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                            const char *function, char *file,
+                            unsigned int line, bool withdraw);
+
 #define gfs2_io_error_bh_wd(sdp, bh) \
 gfs2_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, true);
 
 #define gfs2_io_error_bh(sdp, bh) \
 gfs2_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, false);
 
+#define gfs2_jrnl_io_error_bh(sdp, bh) \
+gfs2_jrnl_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, false);
+
 
 extern struct kmem_cache *gfs2_glock_cachep;
 extern struct kmem_cache *gfs2_glock_aspace_cachep;
-- 
2.17.2

Reply via email to