Hi,

On Thu, Jul 27, 2023 at 9:23 AM Alexander Aring <aahri...@redhat.com> wrote:
>
> This patch puts the life of a midcomms node the same as a lowcomms
> connection. The lowcomms connection lifetime was changed by commit
> 6f0b0b5d7ae7 ("fs: dlm: remove dlm_node_addrs lookup list"). In the
> future the midcomms node instances can be merged with lowcomms
> connection structure as the lifetime is the same and states can be
> controlled over values or flags.
>
> Before midcomms nodes were generated during version detection. This is
> not necessary anymore when the nodes are created when the cluster
> manager configures DLM via configfs. When a midcomms node is created over
> configfs it well set DLM_VERSION_NOT_SET as version. This indicates that
> the version of the midcomms node is still unknown and need to be probed
> via certain rcom messages.
>
> Signed-off-by: Alexander Aring <aahri...@redhat.com>
> ---
>  fs/dlm/config.c   |   2 +-
>  fs/dlm/midcomms.c | 286 +++++++++++++++++-----------------------------
>  fs/dlm/midcomms.h |   1 +
>  3 files changed, 110 insertions(+), 179 deletions(-)
>
> diff --git a/fs/dlm/config.c b/fs/dlm/config.c
> index 2beceff024e3..e55e0a2cd2e8 100644
> --- a/fs/dlm/config.c
> +++ b/fs/dlm/config.c
> @@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, 
> const char *buf,
>
>         memcpy(addr, buf, len);
>
> -       rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
> +       rv = dlm_midcomms_addr(cm->nodeid, addr, len);
>         if (rv) {
>                 kfree(addr);
>                 return rv;
> diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
> index c125496e03bf..24952ecf8e1a 100644
> --- a/fs/dlm/midcomms.c
> +++ b/fs/dlm/midcomms.c
> @@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node 
> *node)
>         wake_up(&node->shutdown_wait);
>  }
>
> -static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
> +static struct midcomms_node *nodeid2node(int nodeid)
>  {
> -       struct midcomms_node *node, *tmp;
> -       int r = nodeid_hash(nodeid);
> +       return __find_node(nodeid, nodeid_hash(nodeid));
> +}
> +
> +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
> +{
> +       int ret, r = nodeid_hash(nodeid);
> +       struct midcomms_node *node;
>
> -       node = __find_node(nodeid, r);
> -       if (node || !alloc)
> -               return node;
> +       ret = dlm_lowcomms_addr(nodeid, addr, len);
> +       if (ret)
> +               return ret;
>
> -       node = kmalloc(sizeof(*node), alloc);
> +       node = kmalloc(sizeof(*node), GFP_NOFS);
>         if (!node)
> -               return NULL;
> +               return -ENOMEM;
>
>         node->nodeid = nodeid;
>         spin_lock_init(&node->state_lock);
> @@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, 
> gfp_t alloc)
>         midcomms_node_reset(node);
>
>         spin_lock(&nodes_lock);
> -       /* check again if there was somebody else
> -        * earlier here to add the node
> -        */
> -       tmp = __find_node(nodeid, r);
> -       if (tmp) {
> -               spin_unlock(&nodes_lock);
> -               kfree(node);
> -               return tmp;
> -       }
> -
>         hlist_add_head_rcu(&node->hlist, &node_hash[r]);
>         spin_unlock(&nodes_lock);
>
>         node->debugfs = dlm_create_debug_comms_file(nodeid, node);
> -       return node;
> +       return 0;
>  }
>
>  static int dlm_send_ack(int nodeid, uint32_t seq)
> @@ -603,112 +598,6 @@ static void dlm_midcomms_receive_buffer(const union 
> dlm_packet *p,
>         }
>  }
>
> -static struct midcomms_node *
> -dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
> -                             uint16_t msglen, int (*cb)(struct midcomms_node 
> *node))
> -{
> -       struct midcomms_node *node = NULL;
> -       gfp_t allocation = 0;
> -       int ret;
> -
> -       switch (p->header.h_cmd) {
> -       case DLM_RCOM:
> -               if (msglen < sizeof(struct dlm_rcom)) {
> -                       log_print("rcom msg too small: %u, will skip this 
> message from node %d",
> -                                 msglen, nodeid);
> -                       return NULL;
> -               }
> -
> -               switch (p->rcom.rc_type) {
> -               case cpu_to_le32(DLM_RCOM_NAMES):
> -                       fallthrough;
> -               case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
> -                       fallthrough;
> -               case cpu_to_le32(DLM_RCOM_STATUS):
> -                       fallthrough;
> -               case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
> -                       node = nodeid2node(nodeid, 0);
> -                       if (node) {
> -                               spin_lock(&node->state_lock);
> -                               if (node->state != DLM_ESTABLISHED)
> -                                       pr_debug("receive begin RCOM msg from 
> node %d with state %s\n",
> -                                                node->nodeid, 
> dlm_state_str(node->state));
> -
> -                               switch (node->state) {
> -                               case DLM_CLOSED:
> -                                       node->state = DLM_ESTABLISHED;
> -                                       pr_debug("switch node %d to state 
> %s\n",
> -                                                node->nodeid, 
> dlm_state_str(node->state));
> -                                       break;
> -                               case DLM_ESTABLISHED:
> -                                       break;
> -                               default:
> -                                       spin_unlock(&node->state_lock);
> -                                       return NULL;
> -                               }
> -                               spin_unlock(&node->state_lock);
> -                       }
> -
> -                       allocation = GFP_NOFS;
> -                       break;
> -               default:
> -                       break;
> -               }
> -
> -               break;
> -       default:
> -               break;
> -       }
> -
> -       node = nodeid2node(nodeid, allocation);
> -       if (!node) {
> -               switch (p->header.h_cmd) {
> -               case DLM_OPTS:
> -                       if (msglen < sizeof(struct dlm_opts)) {
> -                               log_print("opts msg too small: %u, will skip 
> this message from node %d",
> -                                         msglen, nodeid);
> -                               return NULL;
> -                       }
> -
> -                       log_print_ratelimited("received dlm opts message 
> nextcmd %d from node %d in an invalid sequence",
> -                                             p->opts.o_nextcmd, nodeid);
> -                       break;
> -               default:
> -                       log_print_ratelimited("received dlm message cmd %d 
> from node %d in an invalid sequence",
> -                                             p->header.h_cmd, nodeid);
> -                       break;
> -               }
> -
> -               return NULL;
> -       }
> -
> -       ret = cb(node);
> -       if (ret < 0)
> -               return NULL;
> -
> -       return node;
> -}
> -
> -static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
> -{
> -       switch (node->version) {
> -       case DLM_VERSION_NOT_SET:
> -               node->version = DLM_VERSION_3_2;
> -               wake_up(&node->shutdown_wait);
> -               log_print("version 0x%08x for node %d detected", 
> DLM_VERSION_3_2,
> -                         node->nodeid);
> -               break;
> -       case DLM_VERSION_3_2:
> -               break;
> -       default:
> -               log_print_ratelimited("version mismatch detected, assumed 
> 0x%08x but node %d has 0x%08x",
> -                                     DLM_VERSION_3_2, node->nodeid, 
> node->version);
> -               return -1;
> -       }
> -
> -       return 0;
> -}
> -
>  static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen,
>                                  int nodeid)
>  {
> @@ -767,10 +656,37 @@ static void dlm_midcomms_receive_buffer_3_2(const union 
> dlm_packet *p, int nodei
>         int ret, idx;
>
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
> -                                            dlm_midcomms_version_check_3_2);
> -       if (!node)
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node))
> +               goto out;
> +
> +       switch (node->version) {
> +       case DLM_VERSION_NOT_SET:
> +               node->version = DLM_VERSION_3_2;
> +               wake_up(&node->shutdown_wait);
> +               log_print("version 0x%08x for node %d detected", 
> DLM_VERSION_3_2,
> +                         node->nodeid);
> +
> +               spin_lock(&node->state_lock);
> +               switch (node->state) {
> +               case DLM_CLOSED:
> +                       node->state = DLM_ESTABLISHED;
> +                       pr_debug("switch node %d to state %s\n",
> +                                node->nodeid, dlm_state_str(node->state));
> +                       break;
> +               default:
> +                       break;
> +               }
> +               spin_unlock(&node->state_lock);
> +
> +               break;
> +       case DLM_VERSION_3_2:
> +               break;
> +       default:
> +               log_print_ratelimited("version mismatch detected, assumed 
> 0x%08x but node %d has 0x%08x",
> +                                     DLM_VERSION_3_2, node->nodeid, 
> node->version);
>                 goto out;
> +       }
>
>         switch (p->header.h_cmd) {
>         case DLM_RCOM:
> @@ -860,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(const union 
> dlm_packet *p, int nodei
>         srcu_read_unlock(&nodes_srcu, idx);
>  }
>
> -static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
> +static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int 
> nodeid)
>  {
> +       uint16_t msglen = le16_to_cpu(p->header.h_length);
> +       struct midcomms_node *node;
> +       int idx;
> +
> +       idx = srcu_read_lock(&nodes_srcu);
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node)) {
> +               srcu_read_unlock(&nodes_srcu, idx);
> +               return;
> +       }
> +
>         switch (node->version) {
>         case DLM_VERSION_NOT_SET:
>                 node->version = DLM_VERSION_3_1;
> @@ -874,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct 
> midcomms_node *node)
>         default:
>                 log_print_ratelimited("version mismatch detected, assumed 
> 0x%08x but node %d has 0x%08x",
>                                       DLM_VERSION_3_1, node->nodeid, 
> node->version);
> -               return -1;
> -       }
> -
> -       return 0;
> -}
> -
> -static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int 
> nodeid)
> -{
> -       uint16_t msglen = le16_to_cpu(p->header.h_length);
> -       struct midcomms_node *node;
> -       int idx;
> -
> -       idx = srcu_read_lock(&nodes_srcu);
> -       node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
> -                                            dlm_midcomms_version_check_3_1);
> -       if (!node) {
>                 srcu_read_unlock(&nodes_srcu, idx);
>                 return;
>         }
> @@ -1005,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid)
>         int idx, ret;
>
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = nodeid2node(nodeid, 0);
> -       if (!node) {
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node)) {
>                 srcu_read_unlock(&nodes_srcu, idx);
>                 return;
>         }
> @@ -1092,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int 
> nodeid, int len,
>         int idx;
>
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = nodeid2node(nodeid, 0);
> -       if (!node) {
> -               WARN_ON_ONCE(1);
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node))
>                 goto err;
> -       }
>
>         /* this is a bug, however we going on and hope it will be resolved */
>         WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
> @@ -1237,8 +1146,34 @@ void dlm_midcomms_init(void)
>         dlm_lowcomms_init();
>  }
>
> +static void midcomms_node_release(struct rcu_head *rcu)
> +{
> +       struct midcomms_node *node = container_of(rcu, struct midcomms_node, 
> rcu);
> +
> +       WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
> +       dlm_send_queue_flush(node);
> +       kfree(node);
> +}
> +
>  void dlm_midcomms_exit(void)
>  {
> +       struct midcomms_node *node;
> +       int i, idx;
> +
> +       idx = srcu_read_lock(&nodes_srcu);
> +       for (i = 0; i < CONN_HASH_SIZE; i++) {
> +               hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
> +                       dlm_delete_debug_comms_file(node->debugfs);
> +
> +                       spin_lock(&nodes_lock);
> +                       hlist_del_rcu(&node->hlist);
> +                       spin_unlock(&nodes_lock);
> +
> +                       call_srcu(&nodes_srcu, &node->rcu, 
> midcomms_node_release);
> +               }
> +       }
> +       srcu_read_unlock(&nodes_srcu, idx);
> +
>         dlm_lowcomms_exit();
>  }
>
> @@ -1279,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid)
>         int idx;
>
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = nodeid2node(nodeid, GFP_NOFS);
> -       if (!node) {
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node)) {
>                 srcu_read_unlock(&nodes_srcu, idx);
>                 return;
>         }
> @@ -1324,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid)
>         int idx;
>
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = nodeid2node(nodeid, 0);
> -       if (!node) {
> +       node = nodeid2node(nodeid);
> +       if (WARN_ON_ONCE(!node)) {
>                 srcu_read_unlock(&nodes_srcu, idx);
>                 return;
>         }
> @@ -1369,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid)
>         srcu_read_unlock(&nodes_srcu, idx);
>  }
>
> -static void midcomms_node_release(struct rcu_head *rcu)
> -{
> -       struct midcomms_node *node = container_of(rcu, struct midcomms_node, 
> rcu);
> -
> -       WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
> -       dlm_send_queue_flush(node);
> -       kfree(node);
> -}
> -
>  void dlm_midcomms_version_wait(void)
>  {
>         struct midcomms_node *node;
> @@ -1440,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node 
> *node)
>                                  node->state == DLM_CLOSED ||
>                                  test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
>                                  DLM_SHUTDOWN_TIMEOUT);
> -       if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
> +       if (!ret)
>                 pr_debug("active shutdown timed out for node %d with state 
> %s\n",
>                          node->nodeid, dlm_state_str(node->state));
>         else
> @@ -1458,14 +1384,6 @@ void dlm_midcomms_shutdown(void)
>         for (i = 0; i < CONN_HASH_SIZE; i++) {
>                 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
>                         midcomms_shutdown(node);
> -
> -                       dlm_delete_debug_comms_file(node->debugfs);
> -
> -                       spin_lock(&nodes_lock);
> -                       hlist_del_rcu(&node->hlist);
> -                       spin_unlock(&nodes_lock);
> -
> -                       call_srcu(&nodes_srcu, &node->rcu, 
> midcomms_node_release);
>                 }
>         }
>         srcu_read_unlock(&nodes_srcu, idx);
> @@ -1481,7 +1399,7 @@ int dlm_midcomms_close(int nodeid)
>
>         idx = srcu_read_lock(&nodes_srcu);
>         /* Abort pending close/remove operation */
> -       node = nodeid2node(nodeid, 0);
> +       node = nodeid2node(nodeid);
>         if (node) {
>                 /* let shutdown waiters leave */
>                 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
> @@ -1493,7 +1411,7 @@ int dlm_midcomms_close(int nodeid)
>
>         mutex_lock(&close_lock);
>         idx = srcu_read_lock(&nodes_srcu);
> -       node = nodeid2node(nodeid, 0);
> +       node = nodeid2node(nodeid);
>         if (!node) {
>                 srcu_read_unlock(&nodes_srcu, idx);
>                 mutex_unlock(&close_lock);
> @@ -1501,9 +1419,21 @@ int dlm_midcomms_close(int nodeid)
>         }
>
>         ret = dlm_lowcomms_close(nodeid);
> -       spin_lock(&node->state_lock);
> -       midcomms_node_reset(node);
> -       spin_unlock(&node->state_lock);
> +       dlm_delete_debug_comms_file(node->debugfs);
> +
> +       spin_lock(&nodes_lock);
> +       hlist_del_rcu(&node->hlist);
> +       spin_unlock(&nodes_lock);
> +
> +       /* wait that all readers left until flush send queue */
> +       synchronize_srcu(&nodes_srcu);
> +

This will obviously deadlock when holding the nodes_srcu read lock. I
have a small fix for it by moving srcu_read_unlock(&nodes_srcu, idx);
right after we removed the node from the hash.

- Alex

Reply via email to