On Tue, Oct 10, 2023 at 06:04:48PM -0400, Alexander Aring wrote:
> If there is a burst of message the receive worker will filling up the
> processing queue but where are too slow to process dlm messages. This
> patch will slow down the receiver worker to keep the buffer on the
> socket layer to tell the sender to backoff. This is done by a threshold
> to get the next buffers from the socket after all messages were
> processed done by a flush_workqueue(). This however only occurs when we
> have a message burst when we e.g. create 1 million locks. If we put more
> and more new messages to process in the processqueue we will soon run out
> of memory.
> 
> Signed-off-by: Alexander Aring <aahri...@redhat.com>
> ---
>  fs/dlm/lowcomms.c | 12 ++++++++++++
>  1 file changed, 12 insertions(+)
> 
> diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
> index f7bc22e74db2..67f8dd8a05ef 100644
> --- a/fs/dlm/lowcomms.c
> +++ b/fs/dlm/lowcomms.c
> @@ -63,6 +63,7 @@
>  #include "config.h"
>  
>  #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000)
> +#define DLM_MAX_PROCESS_BUFFERS 24
>  #define NEEDED_RMEM (4*1024*1024)
>  
>  struct connection {
> @@ -194,6 +195,7 @@ static const struct dlm_proto_ops *dlm_proto_ops;
>  #define DLM_IO_END 1
>  #define DLM_IO_EOF 2
>  #define DLM_IO_RESCHED 3
> +#define DLM_IO_FLUSH 4
>  
>  static void process_recv_sockets(struct work_struct *work);
>  static void process_send_sockets(struct work_struct *work);
> @@ -202,6 +204,7 @@ static void process_dlm_messages(struct work_struct 
> *work);
>  static DECLARE_WORK(process_work, process_dlm_messages);
>  static DEFINE_SPINLOCK(processqueue_lock);
>  static bool process_dlm_messages_pending;
> +static atomic_t processqueue_count;
>  static LIST_HEAD(processqueue);
>  
>  bool dlm_lowcomms_is_running(void)
> @@ -874,6 +877,7 @@ static void process_dlm_messages(struct work_struct *work)
>       }
>  
>       list_del(&pentry->list);
> +     atomic_dec(&processqueue_count);
>       spin_unlock(&processqueue_lock);
>  
>       for (;;) {
> @@ -891,6 +895,7 @@ static void process_dlm_messages(struct work_struct *work)
>               }
>  
>               list_del(&pentry->list);
> +             atomic_dec(&processqueue_count);
>               spin_unlock(&processqueue_lock);
>       }
>  }
> @@ -962,6 +967,7 @@ static int receive_from_sock(struct connection *con, int 
> buflen)
>               con->rx_leftover);
>  
>       spin_lock(&processqueue_lock);
> +     ret = atomic_inc_return(&processqueue_count);
>       list_add_tail(&pentry->list, &processqueue);
>       if (!process_dlm_messages_pending) {
>               process_dlm_messages_pending = true;
> @@ -969,6 +975,9 @@ static int receive_from_sock(struct connection *con, int 
> buflen)
>       }
>       spin_unlock(&processqueue_lock);
>  
> +     if (ret > DLM_MAX_PROCESS_BUFFERS)
> +             return DLM_IO_FLUSH;
> +
>       return DLM_IO_SUCCESS;
>  }
>  
> @@ -1503,6 +1512,9 @@ static void process_recv_sockets(struct work_struct 
> *work)
>               wake_up(&con->shutdown_wait);
>               /* CF_RECV_PENDING cleared */
>               break;
> +     case DLM_IO_FLUSH:
> +             flush_workqueue(process_workqueue);
> +             fallthrough;
>       case DLM_IO_RESCHED:
>               cond_resched();
>               queue_work(io_workqueue, &con->rwork);
> -- 
> 2.39.3
> 

<formletter>

This is not the correct way to submit patches for inclusion in the
stable kernel tree.  Please read:
    https://www.kernel.org/doc/html/latest/process/stable-kernel-rules.html
for how to do this properly.

</formletter>

Reply via email to