Use an RBD device lock in addition to a local device semaphore to ensure
that COMPARE AND WRITE operations are handled in an atomic fashion.

Lock acquisition is handled via a new workqueue, which currently polls
periodically for the RBD device lock until it can be acquired. This
polling behaviour should be replaced in future with watch/notify
functionality similar to that used for RBD_NOTIFY_OP_SCSI_LUN_RESET.

Signed-off-by: David Disseldorp <[email protected]>
---
 drivers/block/rbd_tcm.c              | 208 +++++++++++++++++++++++++++++++++++
 include/target/target_core_cluster.h |   4 +
 2 files changed, 212 insertions(+)

diff --git a/drivers/block/rbd_tcm.c b/drivers/block/rbd_tcm.c
index f3ee6ff..47a006f 100644
--- a/drivers/block/rbd_tcm.c
+++ b/drivers/block/rbd_tcm.c
@@ -36,11 +36,26 @@ struct rbd_tcm_reset_event {
        u64 notify_id;
 };
 
+#define RBD_TCM_CAW_LOCK_POLL_HZ       (1 * HZ)
+#define RBD_TCM_CAW_LOCK_TOUT_HZ       (5 * HZ)
+
+struct rbd_tcm_caw_locker {
+       struct delayed_work caw_work;
+       struct delayed_work caw_tout_work;
+       struct completion lock_finished;
+       int lock_rc;
+       int retries;
+       struct rbd_tcm_device *rbd_tcm_dev;
+};
+
 struct rbd_tcm_device {
        struct rbd_device *rbd_dev;
        struct se_device *se_dev;
 
        struct rbd_tcm_reset_event reset_evt;
+
+       struct rbd_tcm_caw_locker *caw_locker;
+       struct workqueue_struct *caw_wq;
 };
 
 static int rbd_tcm_start_reset(struct se_device *se_dev, u32 timeout)
@@ -92,6 +107,8 @@ static int rbd_tcm_detach_device(struct se_device *se_dev)
        struct request_queue *q = ibock_se_device_to_q(se_dev);
        struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
 
+       destroy_workqueue(rbd_tcm_dev->caw_wq);
+
        cancel_work_sync(&rbd_tcm_dev->reset_evt.work);
        se_dev->cluster_dev_data = NULL;
        rbd_detach_tcm_dev(q->queuedata);
@@ -112,16 +129,207 @@ static int rbd_tcm_attach_device(struct se_device 
*se_dev)
        INIT_WORK(&rbd_tcm_dev->reset_evt.work, rbd_tcm_reset_event_workfn);
        rbd_tcm_dev->reset_evt.rbd_tcm_dev = rbd_tcm_dev;
 
+       /* work queue to serialise COMPARE AND WRITE handling */
+       rbd_tcm_dev->caw_wq = alloc_workqueue("caw-rbd",
+                                             WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
+       if (!rbd_tcm_dev->caw_wq) {
+               rbd_warn(rbd_tcm_dev->rbd_dev,
+                        "Unable to create CAW workqueue for rbd");
+               kfree(rbd_tcm_dev);
+               return -ENOMEM;
+       }
+
        se_dev->cluster_dev_data = rbd_tcm_dev;
        return rbd_attach_tcm_dev(q->queuedata, rbd_tcm_dev);
 }
 
+static void
+rbd_tcm_caw_lock_dispatch(struct work_struct *work)
+{
+       struct rbd_tcm_caw_locker *caw_locker
+               = container_of(work, struct rbd_tcm_caw_locker, caw_work.work);
+       struct rbd_device *rbd_dev = caw_locker->rbd_tcm_dev->rbd_dev;
+       int ret;
+
+       pr_debug("CAW lock dispatch running\n");
+
+       ret = rbd_dev_lock(rbd_dev, "rbd_tcm_caw", 1, "cookie", "",
+                          "exclusive COMPARE AND wRITE lock", 0);
+       if (ret == -EEXIST) {
+               rbd_warn(rbd_dev, "CAW lock conflict, deferring operation");
+               /* TODO use unlock notification, instead of polling */
+               caw_locker->retries++;
+               queue_delayed_work(caw_locker->rbd_tcm_dev->caw_wq,
+                                  &caw_locker->caw_work,
+                                  RBD_TCM_CAW_LOCK_POLL_HZ);
+               return;
+       } else if (ret < 0) {
+               rbd_warn(rbd_dev, "failed to obtain CAW lock: %d", ret);
+               caw_locker->lock_rc = ret;
+               complete(&caw_locker->lock_finished);
+               return;
+       }
+
+       pr_debug("acquired COMPARE AND WRITE lock after %d retries\n",
+                caw_locker->retries);
+
+       cancel_delayed_work_sync(&caw_locker->caw_tout_work);
+       caw_locker->lock_rc = 0;
+       complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_lock_timeout(struct work_struct *work)
+{
+       struct rbd_tcm_caw_locker *caw_locker
+               = container_of(work, struct rbd_tcm_caw_locker,
+                              caw_tout_work.work);
+
+       pr_warn("CAW lock timeout running\n");
+
+       cancel_delayed_work_sync(&caw_locker->caw_work);
+       caw_locker->lock_rc = -ETIMEDOUT;
+       complete(&caw_locker->lock_finished);
+}
+
+/*
+ * Ensure cluster wide exclusive COMPARE AND WRITE access via an RBD device
+ * lock. Local exclusivity is handled via dev->saw_sem.
+ * In future, we may be able to offload the entire atomic read->compare->write
+ * sequence to the OSDs, which would make this interface pretty useless.
+ */
+static int
+rbd_tcm_caw_lock(struct se_device *se_dev)
+{
+       struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
+       struct rbd_tcm_caw_locker *caw_locker;
+       int ret;
+
+       ret = down_interruptible(&se_dev->caw_sem);
+       if (ret != 0) {
+               pr_err("failed to obtain local semaphore\n");
+               return ret;
+       }
+
+       BUG_ON(rbd_tcm_dev->caw_locker != NULL);
+
+       pr_debug("got local CAW semaphore\n");
+
+       caw_locker = kzalloc(sizeof(*caw_locker), GFP_KERNEL);
+       if (!caw_locker) {
+               pr_err("Unable to allocate caw_locker\n");
+               ret = -ENOMEM;;
+               goto err_sem_up;
+       }
+
+       init_completion(&caw_locker->lock_finished);
+       /* whichever work finishes first cancels the other */
+       INIT_DELAYED_WORK(&caw_locker->caw_work, rbd_tcm_caw_lock_dispatch);
+       INIT_DELAYED_WORK(&caw_locker->caw_tout_work, rbd_tcm_caw_lock_timeout);
+       caw_locker->lock_rc = -EINTR;
+       caw_locker->rbd_tcm_dev = rbd_tcm_dev;
+
+       rbd_tcm_dev->caw_locker = caw_locker;
+
+       queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_work, 0);
+       queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_tout_work,
+                          RBD_TCM_CAW_LOCK_TOUT_HZ);
+       pr_debug("work queued, awaiting completion\n");
+       wait_for_completion(&caw_locker->lock_finished);
+       if (caw_locker->lock_rc < 0) {
+               ret = caw_locker->lock_rc;
+               goto err_locker_free;
+       }
+
+       /* caw_locker freed following unlock */
+
+       return 0;
+
+err_locker_free:
+       kfree(caw_locker);
+       rbd_tcm_dev->caw_locker = NULL;
+err_sem_up:
+       up(&se_dev->caw_sem);
+       pr_debug("dropped local CAW semaphore on failure\n");
+       return ret;
+}
+
+static void
+rbd_tcm_caw_unlock_dispatch(struct work_struct *work)
+{
+       struct rbd_tcm_caw_locker *caw_locker
+               = container_of(work, struct rbd_tcm_caw_locker, caw_work.work);
+       struct rbd_device *rbd_dev = caw_locker->rbd_tcm_dev->rbd_dev;
+       int ret;
+
+       pr_debug("CAW unlock dispatch running\n");
+
+       ret = rbd_dev_unlock(rbd_dev, "rbd_tcm_caw", "cookie");
+       if (ret < 0)
+               rbd_warn(rbd_dev, "failed to drop CAW lock: %d", ret);
+       else {
+               pr_debug("dropped RBD COMPARE AND WRITE lock\n");
+       }
+
+       cancel_delayed_work_sync(&caw_locker->caw_tout_work);
+       caw_locker->lock_rc = ret;
+       complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_unlock_timeout(struct work_struct *work)
+{
+       struct rbd_tcm_caw_locker *caw_locker
+               = container_of(work, struct rbd_tcm_caw_locker,
+                              caw_tout_work.work);
+
+       pr_warn("CAW unlock timeout running\n");
+
+       cancel_delayed_work_sync(&caw_locker->caw_work);
+       caw_locker->lock_rc = -ETIMEDOUT;
+       complete(&caw_locker->lock_finished);
+}
+
+static void
+rbd_tcm_caw_unlock(struct se_device *se_dev)
+{
+       struct rbd_tcm_device *rbd_tcm_dev = se_dev->cluster_dev_data;
+       struct rbd_tcm_caw_locker *caw_locker = rbd_tcm_dev->caw_locker;
+
+       /* set if lock was successfull */
+       BUG_ON(caw_locker == NULL);
+
+       init_completion(&caw_locker->lock_finished);
+       INIT_DELAYED_WORK(&caw_locker->caw_work, rbd_tcm_caw_unlock_dispatch);
+       INIT_DELAYED_WORK(&caw_locker->caw_tout_work, 
rbd_tcm_caw_unlock_timeout);
+       caw_locker->lock_rc = -EINTR;
+       caw_locker->retries = 0;
+       caw_locker->rbd_tcm_dev = rbd_tcm_dev;
+
+       queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_work, 0);
+       queue_delayed_work(rbd_tcm_dev->caw_wq, &caw_locker->caw_tout_work,
+                          RBD_TCM_CAW_LOCK_TOUT_HZ);
+       pr_debug("work queued, awaiting completion\n");
+       wait_for_completion(&caw_locker->lock_finished);
+       if (caw_locker->lock_rc < 0) {
+               pr_warn("leaving stale RBD CAW lock");
+       }
+
+       kfree(caw_locker);
+       rbd_tcm_dev->caw_locker = NULL;
+
+       up(&se_dev->caw_sem);
+       pr_debug("dropped local CAW semaphore\n");
+}
+
 static struct se_cluster_api rbd_tcm_template = {
        .name           = "rbd",
        .owner          = THIS_MODULE,
        .reset_device   = rbd_tcm_start_reset,
        .attach_device  = rbd_tcm_attach_device,
        .detach_device  = rbd_tcm_detach_device,
+       .caw_lock       = rbd_tcm_caw_lock,
+       .caw_unlock     = rbd_tcm_caw_unlock,
 };
 
 int rbd_tcm_register(void)
diff --git a/include/target/target_core_cluster.h 
b/include/target/target_core_cluster.h
index 4860c2e..cc1e2aa 100644
--- a/include/target/target_core_cluster.h
+++ b/include/target/target_core_cluster.h
@@ -23,6 +23,10 @@ struct se_cluster_api {
         * takes longer than timeout seconds then -ETIMEDOUT should be returned.
         */
        int (*reset_device)(struct se_device *dev, u32 timeout);
+
+       /* exclusive device locking for atomic COMPARE AND WRITE */
+       int (*caw_lock)(struct se_device *se_dev);
+       void (*caw_unlock)(struct se_device *se_dev);
 };
 
 extern int core_cluster_api_register(struct se_cluster_api *);
-- 
2.1.4

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to