Hello,

The following is a draft patch which implements atomic workqueues and
convert dm-crypt to use it instead of tasklet. It's an early draft and very
lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
pending patchset. The following git branch can be used for testing.

  git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft

I'll go over it to make sure all the pieces work. While it adds some
complications, it doesn't seem too bad and conversion from tasklet should be
straightforward too.

- It hooks into tasklet[_hi] for now but if we get to update all of tasklet
  users, we can just repurpose the tasklet softirq slots directly.

- I thought about allowing busy-waits for flushes and cancels but it didn't
  seem necessary. Keeping them blocking has the benefit of avoiding possible
  nasty deadlocks. We can revisit if there's need.

- Compared to tasklet, each work item goes through a bit more management
  code because I wanted to keep the code as unified as possible to regular
  threaded workqueues. That said, it's not a huge amount and my bet is that
  the difference is unlikely to be noticeable.

Thanks.

>From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
From: Tejun Heo <t...@kernel.org>
Date: Fri, 26 Jan 2024 13:21:42 -1000
Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
 dmcrypt to use it

---
 drivers/md/dm-crypt.c       |  36 +-----
 include/linux/workqueue.h   |   6 +
 kernel/workqueue.c          | 234 +++++++++++++++++++++++++++---------
 kernel/workqueue_internal.h |   3 +
 4 files changed, 186 insertions(+), 93 deletions(-)

diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
index 855b482cbff1..d375285db202 100644
--- a/drivers/md/dm-crypt.c
+++ b/drivers/md/dm-crypt.c
@@ -73,11 +73,8 @@ struct dm_crypt_io {
        struct bio *base_bio;
        u8 *integrity_metadata;
        bool integrity_metadata_from_pool:1;
-       bool in_tasklet:1;
 
        struct work_struct work;
-       struct tasklet_struct tasklet;
-
        struct convert_context ctx;
 
        atomic_t io_pending;
@@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct 
crypt_config *cc,
        io->ctx.r.req = NULL;
        io->integrity_metadata = NULL;
        io->integrity_metadata_from_pool = false;
-       io->in_tasklet = false;
        atomic_set(&io->io_pending, 0);
 }
 
@@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
        atomic_inc(&io->io_pending);
 }
 
-static void kcryptd_io_bio_endio(struct work_struct *work)
-{
-       struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
-
-       bio_endio(io->base_bio);
-}
-
 /*
  * One of the bios was finished. Check for completion of
  * the whole request and correctly clean up the buffer.
@@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
                kfree(io->integrity_metadata);
 
        base_bio->bi_status = error;
-
-       /*
-        * If we are running this function from our tasklet,
-        * we can't call bio_endio() here, because it will call
-        * clone_endio() from dm.c, which in turn will
-        * free the current struct dm_crypt_io structure with
-        * our tasklet. In this case we need to delay bio_endio()
-        * execution to after the tasklet is done and dequeued.
-        */
-       if (io->in_tasklet) {
-               INIT_WORK(&io->work, kcryptd_io_bio_endio);
-               queue_work(cc->io_queue, &io->work);
-               return;
-       }
-
        bio_endio(base_bio);
 }
 
@@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
                kcryptd_crypt_write_convert(io);
 }
 
-static void kcryptd_crypt_tasklet(unsigned long work)
-{
-       kcryptd_crypt((struct work_struct *)work);
-}
-
 static void kcryptd_queue_crypt(struct dm_crypt_io *io)
 {
        struct crypt_config *cc = io->cc;
@@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
                 * it is being executed with irqs disabled.
                 */
                if (in_hardirq() || irqs_disabled()) {
-                       io->in_tasklet = true;
-                       tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, 
(unsigned long)&io->work);
-                       tasklet_schedule(&io->tasklet);
+                       INIT_WORK(&io->work, kcryptd_crypt);
+                       queue_work(system_atomic_wq, &io->work);
                        return;
                }
 
diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 232baea90a1d..1e4938b5b176 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct 
*work) { return 0; }
  * Documentation/core-api/workqueue.rst.
  */
 enum wq_flags {
+       WQ_ATOMIC               = 1 << 0, /* execute in softirq context */
        WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
        WQ_FREEZABLE            = 1 << 2, /* freeze during suspend */
        WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
@@ -392,6 +393,9 @@ enum wq_flags {
        __WQ_ORDERED            = 1 << 17, /* internal: workqueue is ordered */
        __WQ_LEGACY             = 1 << 18, /* internal: create*_workqueue() */
        __WQ_ORDERED_EXPLICIT   = 1 << 19, /* internal: 
alloc_ordered_workqueue() */
+
+       /* atomic wq only allows the following flags */
+       __WQ_ATOMIC_ALLOWS      = WQ_ATOMIC | WQ_HIGHPRI,
 };
 
 enum wq_consts {
@@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
 extern struct workqueue_struct *system_freezable_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;
+extern struct workqueue_struct *system_atomic_wq;
+extern struct workqueue_struct *system_atomic_highpri_wq;
 
 /**
  * alloc_workqueue - allocate a workqueue
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 23740c9ed57a..2a8f21494676 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -73,7 +73,8 @@ enum worker_pool_flags {
         * wq_pool_attach_mutex to avoid changing binding state while
         * worker_attach_to_pool() is in progress.
         */
-       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
+       POOL_ATOMIC             = 1 << 0,       /* is an atomic pool */
+       POOL_MANAGER_ACTIVE     = 1 << 1,       /* being managed */
        POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
 };
 
@@ -115,6 +116,14 @@ enum wq_internal_consts {
        WQ_NAME_LEN             = 32,
 };
 
+/*
+ * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
+ * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
+ * msecs_to_jiffies() can't be an initializer.
+ */
+#define ATOMIC_WORKER_JIFFIES  msecs_to_jiffies(2)
+#define ATOMIC_WORKER_RESTARTS 10
+
 /*
  * Structure fields follow one of the following exclusion rules.
  *
@@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
 #endif
 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
 
+/* the atomic worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+                                    atomic_worker_pools);
+
 /* the per-cpu worker pools */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], 
cpu_worker_pools);
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+                                    cpu_worker_pools);
 
 static DEFINE_IDR(worker_pool_idr);    /* PR: idr of all pools */
 
@@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq 
__ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
+struct workqueue_struct *system_atomic_wq;
+EXPORT_SYMBOL_GPL(system_atomic_wq);
+struct workqueue_struct *system_atomic_highpri_wq;
+EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
 
 static int worker_thread(void *__worker);
+static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
 static void show_pwq(struct pool_workqueue *pwq);
 static void show_one_worker_pool(struct worker_pool *pool);
@@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
                         !lockdep_is_held(&wq_pool_mutex),              \
                         "RCU, wq->mutex or wq_pool_mutex should be held")
 
+#define for_each_atomic_worker_pool(pool, cpu)                         \
+       for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0];            \
+            (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+            (pool)++)
+
 #define for_each_cpu_worker_pool(pool, cpu)                            \
        for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];               \
             (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
        if (!need_more_worker(pool) || !worker)
                return false;
 
+       if (pool->flags & POOL_ATOMIC) {
+               if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
+                       tasklet_hi_schedule(&worker->atomic_tasklet);
+               else
+                       tasklet_schedule(&worker->atomic_tasklet);
+               return true;
+       }
+
        p = worker->task;
 
 #ifdef CONFIG_SMP
@@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue 
*pwq, bool fill)
        lockdep_assert_held(&pool->lock);
 
        if (!nna) {
-               /* per-cpu workqueue, pwq->nr_active is sufficient */
-               obtained = pwq->nr_active < READ_ONCE(wq->max_active);
+               /*
+                * An atomic workqueue always have a single worker per-cpu and
+                * doesn't impose additional max_active limit. For a per-cpu
+                * workqueue, checking pwq->nr_active is sufficient.
+                */
+               if (wq->flags & WQ_ATOMIC)
+                       obtained = true;
+               else
+                       obtained = pwq->nr_active < READ_ONCE(wq->max_active);
                goto out;
        }
 
@@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool 
*pool)
 
        worker->id = id;
 
-       if (pool->cpu >= 0)
-               snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
-                        pool->attrs->nice < 0  ? "H" : "");
-       else
-               snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
-       worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
-                                             "kworker/%s", id_buf);
-       if (IS_ERR(worker->task)) {
-               if (PTR_ERR(worker->task) == -EINTR) {
-                       pr_err("workqueue: Interrupted when creating a worker 
thread \"kworker/%s\"\n",
-                              id_buf);
-               } else {
-                       pr_err_once("workqueue: Failed to create a worker 
thread: %pe",
-                                   worker->task);
+       if (pool->flags & POOL_ATOMIC) {
+               tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
+       } else {
+               if (pool->cpu >= 0)
+                       snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, 
id,
+                                pool->attrs->nice < 0  ? "H" : "");
+               else
+                       snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, 
id);
+
+               worker->task = kthread_create_on_node(worker_thread, worker,
+                                       pool->node, "kworker/%s", id_buf);
+               if (IS_ERR(worker->task)) {
+                       if (PTR_ERR(worker->task) == -EINTR) {
+                               pr_err("workqueue: Interrupted when creating a 
worker thread \"kworker/%s\"\n",
+                                      id_buf);
+                       } else {
+                               pr_err_once("workqueue: Failed to create a 
worker thread: %pe",
+                                           worker->task);
+                       }
+                       goto fail;
                }
-               goto fail;
-       }
 
-       set_user_nice(worker->task, pool->attrs->nice);
-       kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+               set_user_nice(worker->task, pool->attrs->nice);
+               kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+       }
 
        /* successful, attach the worker to the pool */
        worker_attach_to_pool(worker, pool);
@@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool 
*pool)
         * check if not woken up soon. As kick_pool() is noop if @pool is empty,
         * wake it up explicitly.
         */
-       wake_up_process(worker->task);
+       if (worker->task)
+               wake_up_process(worker->task);
 
        raw_spin_unlock_irq(&pool->lock);
 
@@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
        lock_map_release(&lockdep_map);
        lock_map_release(&pwq->wq->lockdep_map);
 
-       if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
-                    rcu_preempt_depth() > 0)) {
-               pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
-                      "     last function: %ps\n",
-                      current->comm, preempt_count(), rcu_preempt_depth(),
-                      task_pid_nr(current), worker->current_func);
-               debug_show_held_locks(current);
-               dump_stack();
-       }
+       if (worker->task) {
+               if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
+                            rcu_preempt_depth() > 0)) {
+                       pr_err("BUG: workqueue leaked lock or atomic: 
%s/0x%08x/%d/%d\n"
+                              "     last function: %ps\n",
+                              current->comm, preempt_count(),
+                              rcu_preempt_depth(), task_pid_nr(current),
+                              worker->current_func);
+                       debug_show_held_locks(current);
+                       dump_stack();
+               }
 
-       /*
-        * The following prevents a kworker from hogging CPU on !PREEMPTION
-        * kernels, where a requeueing work item waiting for something to
-        * happen could deadlock with stop_machine as such work item could
-        * indefinitely requeue itself while all other CPUs are trapped in
-        * stop_machine. At the same time, report a quiescent RCU state so
-        * the same condition doesn't freeze RCU.
-        */
-       cond_resched();
+               /*
+                * The following prevents a kworker from hogging CPU on
+                * !PREEMPTION kernels, where a requeueing work item waiting for
+                * something to happen could deadlock with stop_machine as such
+                * work item could indefinitely requeue itself while all other
+                * CPUs are trapped in stop_machine. At the same time, report a
+                * quiescent RCU state so the same condition doesn't freeze RCU.
+                */
+               if (worker->task)
+                       cond_resched();
+       } else {
+               if (unlikely(lockdep_depth(current) > 0)) {
+                       pr_err("BUG: atomic workqueue leaked lock: last 
function: %ps\n",
+                              worker->current_func);
+                       debug_show_held_locks(current);
+               }
+       }
 
        raw_spin_lock_irq(&pool->lock);
 
@@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
        goto repeat;
 }
 
+void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
+{
+       struct worker *worker =
+               container_of(tasklet, struct worker, atomic_tasklet);
+       struct worker_pool *pool = worker->pool;
+       int nr_restarts = ATOMIC_WORKER_RESTARTS;
+       unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
+
+       raw_spin_lock_irq(&pool->lock);
+       worker_leave_idle(worker);
+
+       /*
+        * This function follows the structure of worker_thread(). See there for
+        * explanations on each step.
+        */
+       if (need_more_worker(pool))
+               goto done;
+
+       WARN_ON_ONCE(!list_empty(&worker->scheduled));
+       worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
+
+       do {
+               struct work_struct *work =
+                       list_first_entry(&pool->worklist,
+                                        struct work_struct, entry);
+
+               if (assign_work(work, worker, NULL))
+                       process_scheduled_works(worker);
+       } while (--nr_restarts && time_before(jiffies, end) &&
+                keep_working(pool));
+
+       worker_set_flags(worker, WORKER_PREP);
+done:
+       worker_enter_idle(worker);
+       kick_pool(pool);
+       raw_spin_unlock_irq(&pool->lock);
+}
+
 /**
  * check_flush_dependency - check for flush dependency sanity
  * @target_wq: workqueue being flushed
@@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
        size_t wq_size;
        int name_len;
 
+       if (flags & WQ_ATOMIC) {
+               if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
+                       return NULL;
+               if (WARN_ON_ONCE(max_active))
+                       return NULL;
+       }
+
        /*
         * Unbound && max_active == 1 used to imply ordered, which is no longer
         * the case on many machines due to per-pod pools. While
@@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char 
*name, const struct cpuma
        cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
 }
 
+static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int 
nice)
+{
+       BUG_ON(init_worker_pool(pool));
+       pool->cpu = cpu;
+       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
+       pool->attrs->nice = nice;
+       pool->attrs->affn_strict = true;
+       pool->node = cpu_to_node(cpu);
+
+       /* alloc pool ID */
+       mutex_lock(&wq_pool_mutex);
+       BUG_ON(worker_pool_assign_id(pool));
+       mutex_unlock(&wq_pool_mutex);
+}
+
 /**
  * workqueue_init_early - early init for workqueue subsystem
  *
@@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
        pt->pod_node[0] = NUMA_NO_NODE;
        pt->cpu_pod[0] = 0;
 
-       /* initialize CPU pools */
+       /* initialize atomic and CPU pools */
        for_each_possible_cpu(cpu) {
                struct worker_pool *pool;
 
                i = 0;
-               for_each_cpu_worker_pool(pool, cpu) {
-                       BUG_ON(init_worker_pool(pool));
-                       pool->cpu = cpu;
-                       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
-                       cpumask_copy(pool->attrs->__pod_cpumask, 
cpumask_of(cpu));
-                       pool->attrs->nice = std_nice[i++];
-                       pool->attrs->affn_strict = true;
-                       pool->node = cpu_to_node(cpu);
-
-                       /* alloc pool ID */
-                       mutex_lock(&wq_pool_mutex);
-                       BUG_ON(worker_pool_assign_id(pool));
-                       mutex_unlock(&wq_pool_mutex);
+               for_each_atomic_worker_pool(pool, cpu) {
+                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
+                       pool->flags |= POOL_ATOMIC;
                }
+
+               i = 0;
+               for_each_cpu_worker_pool(pool, cpu)
+                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
        }
 
        /* create default unbound and ordered wq attrs */
@@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
        system_freezable_power_efficient_wq = 
alloc_workqueue("events_freezable_pwr_efficient",
                                              WQ_FREEZABLE | WQ_POWER_EFFICIENT,
                                              0);
+       system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
+       system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
+                                                  WQ_ATOMIC | WQ_HIGHPRI, 0);
        BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
               !system_unbound_wq || !system_freezable_wq ||
               !system_power_efficient_wq ||
-              !system_freezable_power_efficient_wq);
+              !system_freezable_power_efficient_wq ||
+              !system_atomic_wq || !system_atomic_highpri_wq);
 }
 
 static void __init wq_cpu_intensive_thresh_init(void)
@@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
         * up. Also, create a rescuer for workqueues that requested it.
         */
        for_each_possible_cpu(cpu) {
-               for_each_cpu_worker_pool(pool, cpu) {
+               for_each_atomic_worker_pool(pool, cpu)
+                       pool->node = cpu_to_node(cpu);
+               for_each_cpu_worker_pool(pool, cpu)
                        pool->node = cpu_to_node(cpu);
-               }
        }
 
        list_for_each_entry(wq, &workqueues, list) {
@@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
 
        /* create the initial workers */
        for_each_online_cpu(cpu) {
+               for_each_atomic_worker_pool(pool, cpu)
+                       BUG_ON(!create_worker(pool));
                for_each_cpu_worker_pool(pool, cpu) {
                        pool->flags &= ~POOL_DISASSOCIATED;
                        BUG_ON(!create_worker(pool));
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index f6275944ada7..f65f204f38ea 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -10,6 +10,7 @@
 
 #include <linux/workqueue.h>
 #include <linux/kthread.h>
+#include <linux/interrupt.h>
 #include <linux/preempt.h>
 
 struct worker_pool;
@@ -42,6 +43,8 @@ struct worker {
        struct list_head        scheduled;      /* L: scheduled works */
 
        struct task_struct      *task;          /* I: worker task */
+       struct tasklet_struct   atomic_tasklet; /* I: tasklet for atomic pool */
+
        struct worker_pool      *pool;          /* A: the associated pool */
                                                /* L: for rescuers */
        struct list_head        node;           /* A: anchored at pool->workers 
*/
-- 
2.43.0


Reply via email to