The current dlm version detection is very complex due backwards
compatiblilty with earlier dlm protocol versions. It took some time to
detect if a peer node has a specific DLM version, if it's not known we
just cut the socket connection. Now there could be cases where the node
didn't detected the version field yet but the peer node detected it and
we are trying to shutdown the dlm connection with a specific mechanism
to avoid synchronization issue when pending cluster lockspace
memberships are still on wire.

To make it more robust we introcude a "best effort" wait to wait for the
version detection before shutdown the dlm connection. This need to be
done before the kthread recoverd for recovery handling is stopped,
because recovery handling will trigger enough messages to have a version
detection going on.

It is a corner case which was detected by modprobe dlm_locktroture module
and rmmod dlm_locktroture module directly afterwards (in a looping
behaviour). In practice probably nobody would leave a lockspace immediately
after joining it.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/lockspace.c |  3 +++
 fs/dlm/midcomms.c  | 23 +++++++++++++++++++++++
 fs/dlm/midcomms.h  |  1 +
 3 files changed, 27 insertions(+)

diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 99bc96f90779..d9dc0b734002 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -820,6 +820,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
                return rv;
        }
 
+       if (ls_count == 1)
+               dlm_midcomms_version_wait();
+
        dlm_device_deregister(ls);
 
        if (force < 3 && dlm_user_daemon_available())
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index dbc998b2748b..cf91a5a11b4f 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -657,6 +657,7 @@ static int dlm_midcomms_version_check_3_2(struct 
midcomms_node *node)
        switch (node->version) {
        case DLM_VERSION_NOT_SET:
                node->version = DLM_VERSION_3_2;
+               wake_up(&node->shutdown_wait);
                log_print("version 0x%08x for node %d detected", 
DLM_VERSION_3_2,
                          node->nodeid);
                break;
@@ -826,6 +827,7 @@ static int dlm_midcomms_version_check_3_1(struct 
midcomms_node *node)
        switch (node->version) {
        case DLM_VERSION_NOT_SET:
                node->version = DLM_VERSION_3_1;
+               wake_up(&node->shutdown_wait);
                log_print("version 0x%08x for node %d detected", 
DLM_VERSION_3_1,
                          node->nodeid);
                break;
@@ -1386,6 +1388,27 @@ static void midcomms_node_release(struct rcu_head *rcu)
        kfree(node);
 }
 
+void dlm_midcomms_version_wait(void)
+{
+       struct midcomms_node *node;
+       int i, idx, ret;
+
+       idx = srcu_read_lock(&nodes_srcu);
+       for (i = 0; i < CONN_HASH_SIZE; i++) {
+               hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
+                       ret = wait_event_timeout(node->shutdown_wait,
+                                                node->version != 
DLM_VERSION_NOT_SET ||
+                                                node->state == DLM_CLOSED ||
+                                                test_bit(DLM_NODE_FLAG_CLOSE, 
&node->flags),
+                                                DLM_SHUTDOWN_TIMEOUT);
+                       if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
+                               pr_debug("version wait timed out for node %d 
with state %s\n",
+                                        node->nodeid, 
dlm_state_str(node->state));
+               }
+       }
+       srcu_read_unlock(&nodes_srcu, idx);
+}
+
 static void midcomms_shutdown(struct midcomms_node *node)
 {
        int ret;
diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h
index bea1cee4279c..9f8c9605013d 100644
--- a/fs/dlm/midcomms.h
+++ b/fs/dlm/midcomms.h
@@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int 
len,
                                             gfp_t allocation, char **ppc);
 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name,
                                 int namelen);
+void dlm_midcomms_version_wait(void);
 int dlm_midcomms_close(int nodeid);
 int dlm_midcomms_start(void);
 void dlm_midcomms_stop(void);
-- 
2.31.1

Reply via email to