Currently seq_next is only be read on the receive side which processed
in an ordered way. The seq_send is being protected by locks. To being
able to read the seq_next value on send side as well we convert it to an
atomic_t value. The atomic_cmpxchg() is probably not necessary, however
the atomic_inc() depends on a if coniditional and this should be handled
in an atomic context.

Signed-off-by: Alexander Aring <aahri...@redhat.com>
---
 fs/dlm/midcomms.c | 40 +++++++++++++++++++++++++---------------
 1 file changed, 25 insertions(+), 15 deletions(-)

diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 9c66cb853d17..70286737eb0a 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -152,8 +152,8 @@
 struct midcomms_node {
        int nodeid;
        uint32_t version;
-       uint32_t seq_send;
-       uint32_t seq_next;
+       atomic_t seq_send;
+       atomic_t seq_next;
        /* These queues are unbound because we cannot drop any message in dlm.
         * We could send a fence signal for a specific node to the cluster
         * manager if queues hits some maximum value, however this handling
@@ -317,8 +317,8 @@ static void midcomms_node_reset(struct midcomms_node *node)
 {
        pr_debug("reset node %d\n", node->nodeid);
 
-       node->seq_next = DLM_SEQ_INIT;
-       node->seq_send = DLM_SEQ_INIT;
+       atomic_set(&node->seq_next, DLM_SEQ_INIT);
+       atomic_set(&node->seq_send, DLM_SEQ_INIT);
        node->version = DLM_VERSION_NOT_SET;
        node->flags = 0;
 
@@ -492,9 +492,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet 
*p,
                                        struct midcomms_node *node,
                                        uint32_t seq)
 {
-       if (seq == node->seq_next) {
-               node->seq_next++;
+       bool is_expected_seq;
+       uint32_t oval, nval;
 
+       do {
+               oval = atomic_read(&node->seq_next);
+               is_expected_seq = (oval == seq);
+               if (!is_expected_seq)
+                       break;
+
+               nval = oval + 1;
+       } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
+
+       if (is_expected_seq) {
                switch (p->header.h_cmd) {
                case DLM_FIN:
                        spin_lock(&node->state_lock);
@@ -503,7 +513,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 
                        switch (node->state) {
                        case DLM_ESTABLISHED:
-                               dlm_send_ack(node->nodeid, node->seq_next);
+                               dlm_send_ack(node->nodeid, nval);
 
                                /* passive shutdown DLM_LAST_ACK case 1
                                 * additional we check if the node is used by
@@ -522,14 +532,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet 
*p,
                                }
                                break;
                        case DLM_FIN_WAIT1:
-                               dlm_send_ack(node->nodeid, node->seq_next);
+                               dlm_send_ack(node->nodeid, nval);
                                node->state = DLM_CLOSING;
                                set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
                                pr_debug("switch node %d to state %s\n",
                                         node->nodeid, 
dlm_state_str(node->state));
                                break;
                        case DLM_FIN_WAIT2:
-                               dlm_send_ack(node->nodeid, node->seq_next);
+                               dlm_send_ack(node->nodeid, nval);
                                midcomms_node_reset(node);
                                pr_debug("switch node %d to state %s\n",
                                         node->nodeid, 
dlm_state_str(node->state));
@@ -557,11 +567,11 @@ static void dlm_midcomms_receive_buffer(union dlm_packet 
*p,
                /* retry to ack message which we already have by sending back
                 * current node->seq_next number as ack.
                 */
-               if (seq < node->seq_next)
-                       dlm_send_ack(node->nodeid, node->seq_next);
+               if (seq < oval)
+                       dlm_send_ack(node->nodeid, oval);
 
                log_print_ratelimited("ignore dlm msg because seq mismatch, 
seq: %u, expected: %u, nodeid: %d",
-                                     seq, node->seq_next, node->nodeid);
+                                     seq, oval, node->nodeid);
        }
 }
 
@@ -992,7 +1002,7 @@ void dlm_midcomms_receive_done(int nodeid)
        switch (node->state) {
        case DLM_ESTABLISHED:
                spin_unlock(&node->state_lock);
-               dlm_send_ack(node->nodeid, node->seq_next);
+               dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
                break;
        default:
                spin_unlock(&node->state_lock);
@@ -1058,7 +1068,7 @@ static void midcomms_new_msg_cb(void *data)
        list_add_tail_rcu(&mh->list, &mh->node->send_queue);
        spin_unlock_bh(&mh->node->send_queue_lock);
 
-       mh->seq = mh->node->seq_send++;
+       mh->seq = atomic_fetch_inc(&mh->node->seq_send);
 }
 
 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int 
nodeid,
@@ -1530,7 +1540,7 @@ static void midcomms_new_rawmsg_cb(void *data)
                switch (h->h_cmd) {
                case DLM_OPTS:
                        if (!h->u.h_seq)
-                               h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
+                               h->u.h_seq = 
cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
                        break;
                default:
                        break;
-- 
2.31.1

Reply via email to