In the latest version of this message, the barrier
field is added as an instruction to clients that
they may not use the attached capabilities until
they have a particular OSD map epoch.

Signed-off-by: John Spray <[email protected]>
---
 fs/ceph/caps.c       | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
 fs/ceph/mds_client.c | 114 +++++++++++++++++++++++++++++++++++++++++++--------
 fs/ceph/mds_client.h |   9 ++++
 fs/ceph/super.h      |   3 +-
 4 files changed, 214 insertions(+), 22 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index eb1bf1f..99e3fdd 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -979,6 +979,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
 {
        struct ceph_mds_caps *fc;
        struct ceph_msg *msg;
+       int msg_len;
+       __le32 *epoch_barrier;
 
        dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
             " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,15 +990,31 @@ static int send_cap_msg(struct ceph_mds_session *session,
             seq, issue_seq, mseq, follows, size, max_size,
             xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
+       /*
+        * MSG_CLIENT_CAPS version 5 size calculation:
+               sizeof(ceph_mds_caps) for caps field
+               0 bytes for snapbl field (headerless)
+               4 bytes for flockbl field len=0
+               0 bytes for peer field (op not in import|export)
+               8 bytes for inline_version
+               4 bytes for inline_data len=0
+               4 bytes for epoch barrier
+       */
+       msg_len = sizeof(*fc) + 4 + 8 + 4 + 4;
+
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, msg_len, GFP_NOFS, false);
        if (!msg)
                return -ENOMEM;
+       memset(msg->front.iov_base, 0, msg_len);
+
+       epoch_barrier = msg->front.iov_base + sizeof(*fc) + 4 + 8 + 4;
+       *epoch_barrier = cpu_to_le32(session->s_mdsc->cap_epoch_barrier);
 
+       msg->hdr.version = cpu_to_le16(5);
+       msg->hdr.compat_version = cpu_to_le16(1);
        msg->hdr.tid = cpu_to_le64(flush_tid);
 
        fc = msg->front.iov_base;
-       memset(fc, 0, sizeof(*fc));
-
        fc->cap_id = cpu_to_le64(cid);
        fc->op = cpu_to_le32(op);
        fc->seq = cpu_to_le32(seq);
@@ -2973,14 +2991,39 @@ retry:
        *target_cap = cap;
 }
 
+
+
+/**
+ * Delay handling a cap message until a given OSD epoch
+ *
+ * Call with session mutex held
+ * Call with OSD map_sem held for read
+ */
+static void delay_message(struct ceph_mds_session *session, struct ceph_msg 
*msg, u32 epoch)
+{
+    struct ceph_delayed_message *dm;
+
+    ceph_msg_get(msg);
+
+    dm = kmalloc(sizeof(*dm), GFP_NOFS);
+    memset(dm, 0, sizeof(*dm));
+    dm->dm_epoch = epoch;
+    dm->dm_msg = msg;
+
+    list_add(&dm->dm_item, &session->s_delayed_msgs);
+}
+
 /*
  * Handle a caps message from the MDS.
  *
  * Identify the appropriate session, inode, and call the right handler
  * based on the cap op.
+ *
+ * skip_epoch_check: skip checking epoch_barrier (avoid taking mdsc and osdc 
locks)
  */
 void ceph_handle_caps(struct ceph_mds_session *session,
-                     struct ceph_msg *msg)
+                     struct ceph_msg *msg,
+                     bool skip_epoch_check)
 {
        struct ceph_mds_client *mdsc = session->s_mdsc;
        struct super_block *sb = mdsc->fsc->sb;
@@ -3001,6 +3044,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        void *flock;
        void *end;
        u32 flock_len;
+       u64 inline_version;
+       u32 inline_len;
+       u32 epoch_barrier = 0;
 
        dout("handle_caps from mds%d\n", mds);
 
@@ -3045,6 +3091,62 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                }
        }
 
+       if (le16_to_cpu(msg->hdr.version) >= 5) {
+               void *p = flock + flock_len;
+
+               // Skip peer if applicable
+               if (op == CEPH_CAP_OP_IMPORT) {
+                       p += sizeof(struct ceph_mds_cap_peer);
+               }
+
+               // We don't use this, but decode it to advance p
+               ceph_decode_64_safe(&p, end, inline_version, bad);
+
+               // Read 4 bytes for length of inline_data
+               ceph_decode_32_safe(&p, end, inline_len, bad);
+
+               // Skip length of inline_data
+               if (inline_len != 0) {
+                       p += inline_len;
+               }
+
+               // Read epoch_barrier field
+               ceph_decode_32_safe(&p, end, epoch_barrier, bad);
+       }
+
+       dout("handle_caps v=%d barrier=%d skip=%d\n",
+               le16_to_cpu(msg->hdr.version),
+               epoch_barrier,
+               skip_epoch_check);
+
+       if (epoch_barrier && !skip_epoch_check) {
+               struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
+               // We are required to wait until we have this OSD map epoch
+               // before using the capability.
+               mutex_lock(&mdsc->mutex);
+               if (epoch_barrier > mdsc->cap_epoch_barrier) {
+                       mdsc->cap_epoch_barrier = epoch_barrier;
+               }
+               mutex_unlock(&mdsc->mutex);
+
+               down_read(&osdc->map_sem);
+               if (osdc->osdmap->epoch < epoch_barrier) {
+                       dout("handle_caps delaying message until OSD epoch 
%d\n", epoch_barrier);
+                       mutex_lock(&session->s_mutex);
+                       delay_message(session, msg, epoch_barrier);
+                       mutex_unlock(&session->s_mutex);
+
+                       // Kick OSD client to get the latest map
+                       ceph_monc_request_next_osdmap(&osdc->client->monc);
+
+                       up_read(&osdc->map_sem);
+                       return;
+               } else {
+                       dout("handle_caps barrier %d already satisfied (%d)\n", 
epoch_barrier, osdc->osdmap->epoch);
+                       up_read(&osdc->map_sem);
+               }
+       }
+
        /* lookup ino */
        inode = ceph_find_inode(sb, vino);
        ci = ceph_inode(inode);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 3f5bc23..5022c71 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -332,6 +332,94 @@ static void destroy_reply_info(struct 
ceph_mds_reply_info_parsed *info)
 }
 
 
+/**
+ * Unlink and delete a ceph_delayed message
+ */
+static void discard_delayed(
+               struct ceph_mds_session *session,
+               struct ceph_delayed_message *dm)
+{
+       dout("discard_delayed: putting msg %p\n", dm->dm_msg);
+       ceph_msg_put(dm->dm_msg);
+       list_del(&dm->dm_item);
+       kfree(dm);
+}
+
+
+/**
+ * For all messages waiting for <= this epoch,
+ * dispatch
+ */
+static void replay_delayed(
+               struct ceph_mds_session *session,
+               struct ceph_delayed_message *dm)
+{
+       dout("replay_delayed: releasing delayed msg %p\n", dm->dm_msg);
+       ceph_handle_caps(session, dm->dm_msg, true);
+       discard_delayed(session, dm);
+}
+
+
+/**
+ * Find any delayed messages that are ready to be replayed,
+ * and move them to replay_list
+ */
+static void find_ready_delayed(
+               struct ceph_mds_session *session,
+               struct ceph_delayed_message *dm,
+               struct list_head *replay_list,
+               u32 epoch)
+{
+       if (dm->dm_epoch <= epoch) {
+               dout("find_ready_delayed: delayed msg %p ready (%d vs %d)\n", 
dm->dm_msg, dm->dm_epoch, epoch);
+               list_del(&dm->dm_item);
+               list_add(&dm->dm_item, replay_list);
+       }
+}
+
+
+/**
+ * Call this with map_sem held for read
+ */
+static void handle_osd_map(struct ceph_osd_client *osdc, void *p)
+{
+       struct ceph_mds_client *mdsc = (struct ceph_mds_client*)p;
+       u32 cancelled_epoch = 0;
+       int mds_id;
+
+       if (osdc->osdmap->flags & CEPH_OSDMAP_FULL) {
+               cancelled_epoch = ceph_osdc_cancel_writes(osdc, -ENOSPC);
+               if (cancelled_epoch) {
+                       mdsc->cap_epoch_barrier = max(cancelled_epoch + 1,
+                                                     mdsc->cap_epoch_barrier);
+               }
+       }
+
+       dout("handle_osd_map: epoch=%d\n", osdc->osdmap->epoch);
+
+       // Release any cap messages waiting for this epoch
+       for (mds_id = 0; mds_id < mdsc->max_sessions; ++mds_id) {
+               struct ceph_mds_session *session = mdsc->sessions[mds_id];
+               if (session != NULL) {
+                       struct ceph_delayed_message *dm = NULL;
+                       struct ceph_delayed_message *dm_next = NULL;
+                       struct list_head replay_msgs;
+                       INIT_LIST_HEAD(&replay_msgs);
+
+                       dout("find_ready_delayed... (s=%p)\n", session);
+                       mutex_lock(&session->s_mutex);
+                       list_for_each_entry_safe(dm, dm_next, 
&session->s_delayed_msgs, dm_item)
+                               find_ready_delayed(session, dm, &replay_msgs, 
osdc->osdmap->epoch);
+                       mutex_unlock(&session->s_mutex);
+
+                       dout("replay_delayed... (s=%p)\n", session);
+                       list_for_each_entry_safe(dm, dm_next, &replay_msgs, 
dm_item)
+                               replay_delayed(session, dm);
+               }
+       }
+}
+
+
 /*
  * sessions
  */
@@ -451,6 +539,7 @@ static struct ceph_mds_session *register_session(struct 
ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
+       INIT_LIST_HEAD(&s->s_delayed_msgs);
 
        dout("register_session mds%d\n", mds);
        if (mds >= mdsc->max_sessions) {
@@ -488,10 +577,17 @@ fail_realloc:
 static void __unregister_session(struct ceph_mds_client *mdsc,
                               struct ceph_mds_session *s)
 {
+       struct ceph_delayed_message *dm;
+       struct ceph_delayed_message *dm_next;
+
        dout("__unregister_session mds%d %p\n", s->s_mds, s);
        BUG_ON(mdsc->sessions[s->s_mds] != s);
        mdsc->sessions[s->s_mds] = NULL;
        ceph_con_close(&s->s_con);
+
+       list_for_each_entry_safe(dm, dm_next, &s->s_delayed_msgs, dm_item)
+               discard_delayed(s, dm);
+
        ceph_put_mds_session(s);
 }
 
@@ -3278,22 +3374,6 @@ static void delayed_work(struct work_struct *work)
        schedule_delayed(mdsc);
 }
 
-/**
- * Call this with map_sem held for read
- */
-static void handle_osd_map(struct ceph_osd_client *osdc, void *p)
-{
-       struct ceph_mds_client *mdsc = (struct ceph_mds_client*)p;
-       u32 cancelled_epoch = 0;
-
-       if (osdc->osdmap->flags & CEPH_OSDMAP_FULL) {
-               cancelled_epoch = ceph_osdc_cancel_writes(osdc, -ENOSPC);
-               if (cancelled_epoch) {
-                       mdsc->cap_epoch_barrier = max(cancelled_epoch + 1,
-                                                     mdsc->cap_epoch_barrier);
-               }
-       }
-}
 
 int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
@@ -3683,7 +3763,7 @@ static void dispatch(struct ceph_connection *con, struct 
ceph_msg *msg)
                handle_forward(mdsc, s, msg);
                break;
        case CEPH_MSG_CLIENT_CAPS:
-               ceph_handle_caps(s, msg);
+               ceph_handle_caps(s, msg, false);
                break;
        case CEPH_MSG_CLIENT_SNAP:
                ceph_handle_snap(mdsc, s, msg);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index b9412a8..e389358 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -151,6 +151,15 @@ struct ceph_mds_session {
        atomic_t          s_ref;
        struct list_head  s_waiting;  /* waiting requests */
        struct list_head  s_unsafe;   /* unsafe requests */
+
+    struct list_head  s_delayed_msgs;  /* OSD epoch waiters */
+};
+
+struct ceph_delayed_message
+{
+    struct ceph_msg  *dm_msg;
+    u32               dm_epoch;
+       struct list_head  dm_item;
 };
 
 /*
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index aca2287..c6aab54 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -814,7 +814,8 @@ static inline void ceph_forget_all_cached_acls(struct inode 
*inode)
 /* caps.c */
 extern const char *ceph_cap_string(int c);
 extern void ceph_handle_caps(struct ceph_mds_session *session,
-                            struct ceph_msg *msg);
+                            struct ceph_msg *msg,
+                            bool skip_epoch_check);
 extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
                                     struct ceph_cap_reservation *ctx);
 extern void ceph_add_cap(struct inode *inode,
-- 
1.9.3

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to