This patch uses the "live" glock and some new lvbs to signal when
a node has withdrawn from a file system. Nodes who see this try to
initiate journal recovery.

Definitions:

1. The "withdrawer" is a node that has withdrawn from a gfs2
   file system due to some inconsistency.
2. The "recoverer" is a node who sees the need to replay the
   withdrawers's journal.

The way it works is as follows:

When things are running normally, all nodes with the file system
mounted have the "live" (non-disk) glock enqueued in SH (shared)
mode.

When a node withdraws from the file system, the withdrawer
dequeues the "live" glock and re-enqueues it in EX mode. This
forces all other nodes to see a demote request for the glock.
In the act of demoting and promoting the live glock, a new lvb
is used to inform the other nodes that it has withdrawn and its
journal needs recovery.

Although unlikely, there may be multiple withdrawers and dlm
keeps the lvb consistent between them, so many of these lvb bits
may be set at any given time, indicating more than one node has
withdrawn and all their journals need recovery.

Once the withdrawer has the live glock in EX, it knows all the
other nodes have the live glock in UN (unlocked) and have gone
through their callback, so they know it has withdrawn.

The withdrawer then dequeues the live glock, demoting it back
to UN, then pauses to give the other nodes a chance to
reacquire it. This pause will hopefully prevent gfs2's caching
the transition from EX->UN->SH from simplifying it to a DLM
lock convert directly from EX->SH, which would keep the other
nodes from doing recovery by making them wait forever for the
glock (because once it's back in SH, nothing else is going to
demote it, short of an unmount).

After the pause, it re-enqueues the live glock in shared (SH)
mode once again, thus enabling everyone to resume working
normally after recovery has taken place.

When the other nodes see the demote of the live lock, they
set a flag in the superblock indicating recovery is in progress.

Unfortunately, lvbs are only passed by dlm on lock conversions,
so the nodes need to re-enqueue the live glock. They all need
it back in SH mode eventually, but only after journal recovery.
They also need to prevent other nodes from doing recovery at
the same time for the failed node's journal. For node failures,
DLM takes care of this problem for us by setting its BLOCK_LOCKS
flag and going through its recovery state machine. However, in
the case of a withdraw, we can't use the same mechanism.

So the recoverer acquires the live glock in EX, then performs
journal recovery for all journals indicated in the lvb. Taking
the live glock in EX at this time may cause a race with the
original withdrawer, who enqueued in EX, then UN, then back to
SH. There's a new superblock flag GLF_IN_RECOVERY that is set
when the node withdraws and isn't cleared until recovery is
complete. That prevents the recoverer's new demote callback
for the live lock in EX from mistaking it for another withdraw
of the recoverer.

The recoverer's promote to EX does not cause a glock demote on
any other potential recoverer, since by definition they must
have already demoted the glock to UN before the recoverer got it.

The first recoverer is granted the live glock in EX and performs
journal recovery, clearing the recovery bits in the live glock
lvb as it does. The recoverer then demotes the glock from EX
back to SH.

All subsequent nodes also acquire the glock in EX, but
the lvb bits are then clear, so no additional recovery is done.
They skip the bulk of it and transition the live glock back to
SH too.

In order to accomodate this process, I had to make several
additional changes:

(1) I had to allow non-disk glocks (like the live glock) to be
    enqueued and promoted in do_xmote, even when the file system
    is withdrawn. This should never cause a problem, since they
    are "non-disk" and therefore have no tangible resources to
    corrupt.
(2) The glock demote callback for the live lock cannot call
    journal recovery directly. That's because it's a callback
    made from dlm, which means it has dlm resources locked.
    If you try to call recovery here, it would be a circular
    dependency that could never be filled: dlm's callback would
    try to lock dlm resources it already has locked. Therefore
    I had to separate the actual recovery from the callback.
    I therefore modified gfs2_control_func and added a step
    0 (as Dave Teigland suggested) to signal a withdraw.
    Instead, the callback queues work to the workqueue that
    causes gfs2_control_func to run in a different context
    after the callback is complete. To that end, I changed
    function gfs2_control_func (the node failure recovery
    state machine) to perform this new step 0 to recover
    withdrawer's journals with new function remote_withdraw
    rather than its normal "dead node" recovery sequence.
(3) GFS2 now needs to know the difference between a withdraw
    that occurs inside any of the journal-related code from
    all other withdraws. This is needed to prevent recursion
    where an error (e.g. i/o error writing to the journal)
    causes a withdraw, and the withdraw tries to write out the
    ail lists (or some such) causing more withdraws.
    To prevent recursion, I added a new SDF_JOURNAL_WITHDRAW
    flag that prevents withdraw from recursing on itself.
(4) Instances of the local "withdraw" variable are now
    replaced by a new superblock flag because once the file
    system is withdrawn in this way, it needs to affect its
    behavior that from that point on until unmount or reboot.

Signed-off-by: Bob Peterson <rpete...@redhat.com>
---
 fs/gfs2/glock.c    |  5 +--
 fs/gfs2/glops.c    | 47 ++++++++++++++++++++++++++
 fs/gfs2/incore.h   |  3 ++
 fs/gfs2/lock_dlm.c | 59 ++++++++++++++++++++++++++++++++
 fs/gfs2/log.c      | 62 ++++++++++++++++++----------------
 fs/gfs2/super.c    |  5 +--
 fs/gfs2/super.h    |  1 +
 fs/gfs2/util.c     | 84 ++++++++++++++++++++++++++++++++++++++++++++++
 fs/gfs2/util.h     | 13 +++++++
 9 files changed, 246 insertions(+), 33 deletions(-)

diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 05431324b262..38a8a5eb8245 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -543,7 +543,7 @@ __acquires(&gl->gl_lockref.lock)
        int ret;
 
        if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)) &&
-           target != LM_ST_UNLOCKED)
+           gl->gl_ops != &gfs2_nondisk_glops && target != LM_ST_UNLOCKED)
                return;
        lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP |
                      LM_FLAG_PRIORITY);
@@ -1092,7 +1092,8 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        int error = 0;
 
-       if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)))
+       if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags) &&
+                    gl->gl_ops != &gfs2_nondisk_glops))
                return -EIO;
 
        if (test_bit(GLF_LRU, &gl->gl_flags))
diff --git a/fs/gfs2/glops.c b/fs/gfs2/glops.c
index f79ef9525e33..d1ea83a84862 100644
--- a/fs/gfs2/glops.c
+++ b/fs/gfs2/glops.c
@@ -32,6 +32,8 @@
 
 struct workqueue_struct *gfs2_freeze_wq;
 
+extern struct workqueue_struct *gfs2_control_wq;
+
 static void gfs2_ail_error(struct gfs2_glock *gl, const struct buffer_head *bh)
 {
        fs_err(gl->gl_name.ln_sbd,
@@ -573,6 +575,49 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool 
remote)
        }
 }
 
+/**
+ * nondisk_go_callback - used to signal when a node did a withdraw
+ * @gl: the nondisk glock
+ * @remote: true if this came from a different cluster node
+ *
+ */
+static void nondisk_go_callback(struct gfs2_glock *gl, bool remote)
+{
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+       /* Ignore the callback unless it's from another node, and it's the
+          live lock. */
+       if (!remote || gl->gl_name.ln_number != GFS2_LIVE_LOCK)
+               return;
+
+       /* Ignore the unlock if we're withdrawn, unmounting, or in recovery. */
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags) ||
+           test_bit(SDF_SHUTDOWN, &sdp->sd_flags) ||
+           test_bit(SDF_IN_RECOVERY, &sdp->sd_flags))
+               return;
+
+       /* We only care when a node wants us to unlock, because that means
+        * they want a journal recovered. */
+       if (gl->gl_demote_state != LM_ST_UNLOCKED)
+               return;
+
+       if (sdp->sd_args.ar_spectator) {
+               fs_warn(sdp, "Spectator node cannot recover journals.\n");
+               return;
+       }
+
+       fs_warn(sdp, "Some node has withdrawn; checking for recovery.\n");
+       set_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+       /**
+        * We can't call remote_withdraw directly here or gfs2_recover_journal
+        * because this is called from the glock unlock function and the
+        * remote_withdraw needs to enqueue and dequeue the same "live" glock
+        * we were called from. So we queue it to the control work queue in
+        * lock_dlm.
+        */
+       queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
+}
+
 const struct gfs2_glock_operations gfs2_meta_glops = {
        .go_type = LM_TYPE_META,
 };
@@ -617,6 +662,8 @@ const struct gfs2_glock_operations gfs2_flock_glops = {
 
 const struct gfs2_glock_operations gfs2_nondisk_glops = {
        .go_type = LM_TYPE_NONDISK,
+       .go_callback = nondisk_go_callback,
+       .go_flags = GLOF_LVB,
 };
 
 const struct gfs2_glock_operations gfs2_quota_glops = {
diff --git a/fs/gfs2/incore.h b/fs/gfs2/incore.h
index 663759abe60d..9f9b1a760ba5 100644
--- a/fs/gfs2/incore.h
+++ b/fs/gfs2/incore.h
@@ -621,6 +621,9 @@ enum {
        SDF_SKIP_DLM_UNLOCK     = 8,
        SDF_FORCE_AIL_FLUSH     = 9,
        SDF_AIL1_IO_ERROR       = 10,
+       SDF_REMOTE_WITHDRAW     = 11, /* another node did withdraw */
+       SDF_JOURNAL_WITHDRAW    = 12, /* Was withdrawn from a log operation */
+       SDF_IN_RECOVERY         = 13, /* Performing remote recovery */
 };
 
 enum gfs2_freeze_state {
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 68ca648cf918..ac74fa3531cf 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -19,6 +19,8 @@
 
 #include "incore.h"
 #include "glock.h"
+#include "glops.h"
+#include "recovery.h"
 #include "util.h"
 #include "sys.h"
 #include "trace_gfs2.h"
@@ -325,6 +327,7 @@ static void gdlm_cancel(struct gfs2_glock *gl)
 /*
  * dlm/gfs2 recovery coordination using dlm_recover callbacks
  *
+ *  0. gfs2 checks for another cluster node withdraw, needing journal replay
  *  1. dlm_controld sees lockspace members change
  *  2. dlm_controld blocks dlm-kernel locking activity
  *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
@@ -573,6 +576,56 @@ static int control_lock(struct gfs2_sbd *sdp, int mode, 
uint32_t flags)
                         &ls->ls_control_lksb, "control_lock");
 }
 
+/**
+ * remote_withdraw - react to a node withdrawing from the file system
+ * @sdp: The superblock
+ */
+static void remote_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_jdesc *jd;
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       int ret, count = 0;
+
+       /* Dequeue the "live" glock, but keep a reference so it's never freed:
+        * LVBs only work on dlm conversions, not on new glocks. */
+       gfs2_glock_hold(gl);
+       set_bit(SDF_IN_RECOVERY, &sdp->sd_flags);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+
+       fs_info(sdp, "Reacquiring the live glock.\n");
+       /* Re-lock it EX to pick up the lvb bits */
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret) {
+               fs_err(sdp, "can't reacquire live glock EX: %d\n", ret);
+               goto out;
+       }
+
+       list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
+               fs_info(sdp, "Checking jid %d.\n", jd->jd_jid);
+               if (test_and_clear_bit_le(jd->jd_jid,
+                                         gl->gl_lksb.sb_lvbptr)) {
+                       count++;
+                       ret = gfs2_recover_journal(jd, true);
+                       if (ret)
+                               break;
+               }
+       }
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       /* Re-lock it SH: back to business as usual. */
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret)
+               fs_err(sdp, "can't reacquire live glock SH: %d\n", ret);
+out:
+       clear_bit(SDF_IN_RECOVERY, &sdp->sd_flags);
+       /* Now drop the additional reference we acquired */
+       fs_err(sdp, "%d journals recovered: ret = %d.\n", count, ret);
+       gfs2_glock_queue_put(gl);
+}
+
 static void gfs2_control_func(struct work_struct *work)
 {
        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, 
sd_control_work.work);
@@ -583,6 +636,12 @@ static void gfs2_control_func(struct work_struct *work)
        int recover_size;
        int i, error;
 
+       /* First check for other nodes that may have done a withdraw. */
+       if (test_and_clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
+               remote_withdraw(sdp);
+               return;
+       }
+
        spin_lock(&ls->ls_recover_spin);
        /*
         * No MOUNT_DONE means we're still mounting; control_mount()
diff --git a/fs/gfs2/log.c b/fs/gfs2/log.c
index 5bfaf381921a..b28f7b998eee 100644
--- a/fs/gfs2/log.c
+++ b/fs/gfs2/log.c
@@ -92,8 +92,7 @@ static void gfs2_remove_from_ail(struct gfs2_bufdata *bd)
 
 static int gfs2_ail1_start_one(struct gfs2_sbd *sdp,
                               struct writeback_control *wbc,
-                              struct gfs2_trans *tr,
-                              bool *withdraw)
+                              struct gfs2_trans *tr)
 __releases(&sdp->sd_ail_lock)
 __acquires(&sdp->sd_ail_lock)
 {
@@ -110,10 +109,8 @@ __acquires(&sdp->sd_ail_lock)
                if (!buffer_busy(bh)) {
                        if (!buffer_uptodate(bh) &&
                            !test_and_set_bit(SDF_AIL1_IO_ERROR,
-                                             &sdp->sd_flags)) {
-                               gfs2_io_error_bh(sdp, bh);
-                               *withdraw = true;
-                       }
+                                             &sdp->sd_flags))
+                               gfs2_jrnl_io_error_bh(sdp, bh);
                        list_move(&bd->bd_ail_st_list, &tr->tr_ail2_list);
                        continue;
                }
@@ -153,7 +150,6 @@ void gfs2_ail1_flush(struct gfs2_sbd *sdp, struct 
writeback_control *wbc)
        struct list_head *head = &sdp->sd_ail1_list;
        struct gfs2_trans *tr;
        struct blk_plug plug;
-       bool withdraw = false;
 
        trace_gfs2_ail_flush(sdp, wbc, 1);
        blk_start_plug(&plug);
@@ -162,12 +158,15 @@ void gfs2_ail1_flush(struct gfs2_sbd *sdp, struct 
writeback_control *wbc)
        list_for_each_entry_reverse(tr, head, tr_list) {
                if (wbc->nr_to_write <= 0)
                        break;
-               if (gfs2_ail1_start_one(sdp, wbc, tr, &withdraw))
+               if (gfs2_ail1_start_one(sdp, wbc, tr))
                        goto restart;
        }
        spin_unlock(&sdp->sd_ail_lock);
        blk_finish_plug(&plug);
-       if (withdraw)
+       /* If gfs2_ail1_start_one sees an IO error or has an issue, it will
+        * set the JOURNAL_WITHDRAW bit but it won't withdraw. So we do that
+        * now after the whole list has been processed. */
+       if (test_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
                gfs2_lm_withdraw(sdp, NULL);
        trace_gfs2_ail_flush(sdp, wbc, 0);
 }
@@ -196,8 +195,7 @@ static void gfs2_ail1_start(struct gfs2_sbd *sdp)
  *
  */
 
-static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, struct gfs2_trans *tr,
-                               bool *withdraw)
+static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, struct gfs2_trans *tr)
 {
        struct gfs2_bufdata *bd, *s;
        struct buffer_head *bh;
@@ -210,8 +208,7 @@ static void gfs2_ail1_empty_one(struct gfs2_sbd *sdp, 
struct gfs2_trans *tr,
                        continue;
                if (!buffer_uptodate(bh) &&
                    !test_and_set_bit(SDF_AIL1_IO_ERROR, &sdp->sd_flags)) {
-                       gfs2_io_error_bh(sdp, bh);
-                       *withdraw = true;
+                       gfs2_jrnl_io_error_bh(sdp, bh);
                }
                list_move(&bd->bd_ail_st_list, &tr->tr_ail2_list);
        }
@@ -229,11 +226,10 @@ static int gfs2_ail1_empty(struct gfs2_sbd *sdp)
        struct gfs2_trans *tr, *s;
        int oldest_tr = 1;
        int ret;
-       bool withdraw = false;
 
        spin_lock(&sdp->sd_ail_lock);
        list_for_each_entry_safe_reverse(tr, s, &sdp->sd_ail1_list, tr_list) {
-               gfs2_ail1_empty_one(sdp, tr, &withdraw);
+               gfs2_ail1_empty_one(sdp, tr);
                if (list_empty(&tr->tr_ail1_list) && oldest_tr)
                        list_move(&tr->tr_list, &sdp->sd_ail2_list);
                else
@@ -242,7 +238,7 @@ static int gfs2_ail1_empty(struct gfs2_sbd *sdp)
        ret = list_empty(&sdp->sd_ail1_list);
        spin_unlock(&sdp->sd_ail_lock);
 
-       if (withdraw)
+       if (test_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
                gfs2_lm_withdraw(sdp, "fatal: I/O error(s)\n");
 
        return ret;
@@ -509,8 +505,8 @@ static void log_pull_tail(struct gfs2_sbd *sdp, unsigned 
int new_tail)
 
        atomic_add(dist, &sdp->sd_log_blks_free);
        trace_gfs2_log_blocks(sdp, dist);
-       gfs2_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
-                            sdp->sd_jdesc->jd_blocks);
+       gfs2_jrnl_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
+                                 sdp->sd_jdesc->jd_blocks);
 
        sdp->sd_log_tail = new_tail;
 }
@@ -682,12 +678,18 @@ void gfs2_write_log_header(struct gfs2_sbd *sdp, struct 
gfs2_jdesc *jd,
 {
        struct gfs2_log_header *lh;
        u32 hash, crc;
-       struct page *page = mempool_alloc(gfs2_page_pool, GFP_NOIO);
+       struct page *page;
        struct gfs2_statfs_change_host *l_sc = &sdp->sd_statfs_local;
        struct timespec64 tv;
        struct super_block *sb = sdp->sd_vfs;
        u64 addr;
 
+       if (test_bit(SDF_SHUTDOWN, &sdp->sd_flags) ||
+           test_bit(SDF_AIL1_IO_ERROR, &sdp->sd_flags)) {
+               log_flush_wait(sdp);
+               return;
+       }
+       page = mempool_alloc(gfs2_page_pool, GFP_NOIO);
        lh = page_address(page);
        clear_page(lh);
 
@@ -800,12 +802,13 @@ void gfs2_log_flush(struct gfs2_sbd *sdp, struct 
gfs2_glock *gl, u32 flags)
                INIT_LIST_HEAD(&tr->tr_ail2_list);
                tr->tr_first = sdp->sd_log_flush_head;
                if (unlikely (state == SFS_FROZEN))
-                       gfs2_assert_withdraw(sdp, !tr->tr_num_buf_new && 
!tr->tr_num_databuf_new);
+                       gfs2_jrnl_assert_withdraw(sdp, !tr->tr_num_buf_new &&
+                                                 !tr->tr_num_databuf_new);
        }
 
        if (unlikely(state == SFS_FROZEN))
-               gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
-       gfs2_assert_withdraw(sdp,
+               gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
+       gfs2_jrnl_assert_withdraw(sdp,
                        sdp->sd_log_num_revoke == sdp->sd_log_commited_revoke);
 
        gfs2_ordered_write(sdp);
@@ -893,7 +896,8 @@ static void log_refund(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
        if (sdp->sd_log_tr) {
                gfs2_merge_trans(sdp->sd_log_tr, tr);
        } else if (tr->tr_num_buf_new || tr->tr_num_databuf_new) {
-               gfs2_assert_withdraw(sdp, test_bit(TR_ALLOCED, &tr->tr_flags));
+               gfs2_jrnl_assert_withdraw(sdp, test_bit(TR_ALLOCED,
+                                                       &tr->tr_flags));
                sdp->sd_log_tr = tr;
                set_bit(TR_ATTACHED, &tr->tr_flags);
        }
@@ -901,12 +905,12 @@ static void log_refund(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
        sdp->sd_log_commited_revoke += tr->tr_num_revoke - tr->tr_num_revoke_rm;
        reserved = calc_reserved(sdp);
        maxres = sdp->sd_log_blks_reserved + tr->tr_reserved;
-       gfs2_assert_withdraw(sdp, maxres >= reserved);
+       gfs2_jrnl_assert_withdraw(sdp, maxres >= reserved);
        unused = maxres - reserved;
        atomic_add(unused, &sdp->sd_log_blks_free);
        trace_gfs2_log_blocks(sdp, unused);
-       gfs2_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
-                            sdp->sd_jdesc->jd_blocks);
+       gfs2_jrnl_assert_withdraw(sdp, atomic_read(&sdp->sd_log_blks_free) <=
+                                 sdp->sd_jdesc->jd_blocks);
        sdp->sd_log_blks_reserved = reserved;
 
        gfs2_log_unlock(sdp);
@@ -945,9 +949,9 @@ void gfs2_log_commit(struct gfs2_sbd *sdp, struct 
gfs2_trans *tr)
 
 void gfs2_log_shutdown(struct gfs2_sbd *sdp)
 {
-       gfs2_assert_withdraw(sdp, !sdp->sd_log_blks_reserved);
-       gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
-       gfs2_assert_withdraw(sdp, list_empty(&sdp->sd_ail1_list));
+       gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_blks_reserved);
+       gfs2_jrnl_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
+       gfs2_jrnl_assert_withdraw(sdp, list_empty(&sdp->sd_ail1_list));
 
        sdp->sd_log_flush_head = sdp->sd_log_head;
 
diff --git a/fs/gfs2/super.c b/fs/gfs2/super.c
index d4b11c903971..35a2a8b6f427 100644
--- a/fs/gfs2/super.c
+++ b/fs/gfs2/super.c
@@ -845,7 +845,7 @@ static void gfs2_dirty_inode(struct inode *inode, int flags)
  * Returns: errno
  */
 
-static int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
+int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
 {
        struct gfs2_holder freeze_gh;
        int error;
@@ -923,7 +923,8 @@ static void gfs2_put_super(struct super_block *sb)
 
        if (!sdp->sd_args.ar_spectator) {
                gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
-               gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+               if (gfs2_holder_initialized(&sdp->sd_jinode_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
                gfs2_glock_dq_uninit(&sdp->sd_sc_gh);
                gfs2_glock_dq_uninit(&sdp->sd_qc_gh);
                iput(sdp->sd_sc_inode);
diff --git a/fs/gfs2/super.h b/fs/gfs2/super.h
index 73c97dccae21..aa401752da15 100644
--- a/fs/gfs2/super.h
+++ b/fs/gfs2/super.h
@@ -46,6 +46,7 @@ extern void update_statfs(struct gfs2_sbd *sdp, struct 
buffer_head *m_bh,
                          struct buffer_head *l_bh);
 extern int gfs2_statfs_sync(struct super_block *sb, int type);
 extern void gfs2_freeze_func(struct work_struct *work);
+extern int gfs2_make_fs_ro(struct gfs2_sbd *sdp);
 
 extern struct file_system_type gfs2_fs_type;
 extern struct file_system_type gfs2meta_fs_type;
diff --git a/fs/gfs2/util.c b/fs/gfs2/util.c
index 0a814ccac41d..0e3753175c68 100644
--- a/fs/gfs2/util.c
+++ b/fs/gfs2/util.c
@@ -14,12 +14,15 @@
 #include <linux/buffer_head.h>
 #include <linux/crc32.h>
 #include <linux/gfs2_ondisk.h>
+#include <linux/delay.h>
 #include <linux/uaccess.h>
 
 #include "gfs2.h"
 #include "incore.h"
 #include "glock.h"
+#include "log.h"
 #include "rgrp.h"
+#include "super.h"
 #include "util.h"
 
 struct kmem_cache *gfs2_glock_cachep __read_mostly;
@@ -36,6 +39,69 @@ void gfs2_assert_i(struct gfs2_sbd *sdp)
        fs_emerg(sdp, "fatal assertion failed\n");
 }
 
+static void signal_our_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       int ret;
+
+       /**
+        * Don't tell dlm we're bailing until we have no more buffers in the
+        * wind. If journal had an IO error, the log code should just purge
+        * the outstanding buffers rather than submitting new IO. Making the
+        * file system read-only will flush the journal, etc.
+        */
+       fs_err(sdp, "Making the file system read-only.\n");
+       gfs2_make_fs_ro(sdp);
+       sdp->sd_vfs->s_flags |= MS_RDONLY;
+
+       /* Drop the glock for our journal so another node can recover it. */
+       fs_err(sdp, "Dropping our journal glock to allow recovery.\n");
+       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+
+       /**
+        * Dequeue the "live" glock, but keep a reference so it's never freed:
+        * LVBs only work on dlm conversions, not on new glocks.
+        */
+       gfs2_glock_hold(gl);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /**
+        * We enqueue the "live" glock in EX so that all other nodes
+        * get a demote request and act on it, demoting their glock
+        * from SHARED to UNLOCKED. Once we have the glock in EX, we
+        * know all other nodes have been informed of our departure.
+        * They cannot do anything more until our journal has been
+        * replayed and our locks released.
+        *
+        * We set a bit in the glock's lvb indicating which journal
+        * needs to be replayed, but we can't recover it ourselves
+        * because the withdraw might have been due to an io error, etc.
+        */
+       fs_warn(sdp, "Requesting recovery of jid %d.\n",
+               sdp->sd_lockstruct.ls_jid);
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret) {
+               fs_err(sdp, "can't get live glock EX: %d\n", ret);
+               return;
+       }
+       __set_bit_le(sdp->sd_lockstruct.ls_jid, gl->gl_lksb.sb_lvbptr);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /* Here we sleep so that the other node can grab the lvb we wrote when
+        * the glock is demoted to UN. If we go too fast, dlm might just
+        * do a conversion from EX to SH which seems to destroy the lvb. */
+       msleep(100);
+       /* Drop the EX lock so the lvb gets written. Then reacquire in SH. */
+       gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                          &sdp->sd_live_gh);
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+       if (ret)
+               fs_err(sdp, "Can't reacquire live glock SH: %d\n", ret);
+
+       /* Now drop the additional reference we acquired */
+       gfs2_glock_queue_put(gl);
+}
+
 int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, ...)
 {
        struct lm_lockstruct *ls = &sdp->sd_lockstruct;
@@ -62,6 +128,8 @@ int gfs2_lm_withdraw(struct gfs2_sbd *sdp, const char *fmt, 
...)
                fs_err(sdp, "about to withdraw this file system\n");
                BUG_ON(sdp->sd_args.ar_debug);
 
+               signal_our_withdraw(sdp);
+
                kobject_uevent(&sdp->sd_kobj, KOBJ_OFFLINE);
 
                if (!strcmp(sdp->sd_lockstruct.ls_ops->lm_proto_name, 
"lock_dlm"))
@@ -100,6 +168,15 @@ int gfs2_assert_withdraw_i(struct gfs2_sbd *sdp, char 
*assertion,
        return (me) ? -1 : -2;
 }
 
+int gfs2_assert_withdraw_j(struct gfs2_sbd *sdp, char *assertion,
+                          const char *function, char *file, unsigned int line)
+{
+       if (!test_and_set_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
+               return gfs2_assert_withdraw_i(sdp, assertion, function, file,
+                                             line);
+       return -2;
+}
+
 /**
  * gfs2_assert_warn_i - Print a message to the console if @assertion is false
  * Returns: -1 if we printed something
@@ -270,3 +347,10 @@ void gfs2_io_error_bh_i(struct gfs2_sbd *sdp, struct 
buffer_head *bh,
                gfs2_lm_withdraw(sdp, NULL);
 }
 
+void gfs2_jrnl_io_error_bh_i(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                            const char *function, char *file,
+                            unsigned int line, bool withdraw)
+{
+       if (!test_and_set_bit(SDF_JOURNAL_WITHDRAW, &sdp->sd_flags))
+               gfs2_io_error_bh_i(sdp, bh, function, file, line, withdraw);
+}
diff --git a/fs/gfs2/util.h b/fs/gfs2/util.h
index 9278fecba632..336ad543efe1 100644
--- a/fs/gfs2/util.h
+++ b/fs/gfs2/util.h
@@ -41,11 +41,17 @@ do { \
 
 int gfs2_assert_withdraw_i(struct gfs2_sbd *sdp, char *assertion,
                           const char *function, char *file, unsigned int line);
+int gfs2_assert_withdraw_j(struct gfs2_sbd *sdp, char *assertion,
+                          const char *function, char *file, unsigned int line);
 
 #define gfs2_assert_withdraw(sdp, assertion) \
 ((likely(assertion)) ? 0 : gfs2_assert_withdraw_i((sdp), #assertion, \
                                        __func__, __FILE__, __LINE__))
 
+#define gfs2_jrnl_assert_withdraw(sdp, assertion) \
+((likely(assertion)) ? 0 : gfs2_assert_withdraw_j((sdp), #assertion, \
+                                       __func__, __FILE__, __LINE__))
+
 
 int gfs2_assert_warn_i(struct gfs2_sbd *sdp, char *assertion,
                       const char *function, char *file, unsigned int line);
@@ -140,12 +146,19 @@ void gfs2_io_error_bh_i(struct gfs2_sbd *sdp, struct 
buffer_head *bh,
                        const char *function, char *file, unsigned int line,
                        bool withdraw);
 
+void gfs2_jrnl_io_error_bh_i(struct gfs2_sbd *sdp, struct buffer_head *bh,
+                            const char *function, char *file,
+                            unsigned int line, bool withdraw);
+
 #define gfs2_io_error_bh_wd(sdp, bh) \
 gfs2_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, true);
 
 #define gfs2_io_error_bh(sdp, bh) \
 gfs2_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, false);
 
+#define gfs2_jrnl_io_error_bh(sdp, bh) \
+gfs2_jrnl_io_error_bh_i((sdp), (bh), __func__, __FILE__, __LINE__, false);
+
 
 extern struct kmem_cache *gfs2_glock_cachep;
 extern struct kmem_cache *gfs2_glock_aspace_cachep;
-- 
2.19.1

Reply via email to