Jon,
please check out
git://git.openfabrics.org/~okir/ofed_1_4/linux-2.6.git code-drop-20080703
It has the bcopy changes, and some of the plumbing for RDMA. The
remaining bits aren't complete yet. I'm attaching a patch that contains
my current working state - this doesn't work, and probably doesn't even
compile, but it's for your review.
Olaf
--
Olaf Kirch | --- o --- Nous sommes du soleil we love when we play
[EMAIL PROTECTED] | / | \ sol.dhoop.naytheet.ah kin.ir.samse.qurax
---
net/rds/ib.c | 18 ++
net/rds/ib.h | 36 ++++
net/rds/ib_rdma.c | 401 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
net/rds/ib_send.c | 130 +++++++++++++++++
net/rds/rds.h | 1
net/rds/send.c | 7
6 files changed, 577 insertions(+), 16 deletions(-)
Index: build-2.6/net/rds/ib.c
===================================================================
--- build-2.6.orig/net/rds/ib.c
+++ build-2.6/net/rds/ib.c
@@ -42,11 +42,17 @@
unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
+unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
module_param(fmr_pool_size, int, 0444);
MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
module_param(fmr_message_size, int, 0444);
MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(fastreg_pool_size, int, 0444);
+MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
+module_param(fastreg_message_size, int, 0444);
+MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
struct list_head rds_ib_devices;
@@ -86,9 +92,11 @@ void rds_ib_add_one(struct ib_device *de
rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
- rds_ibdev->max_fmrs = dev_attr->max_fmr?
- min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
- fmr_pool_size;
+ rds_ibdev->max_fmrs = dev_attr->max_fmr;
+
+ /* FIXME: is there a maximum number of fastreg mappings and
+ * a maximum mapping size that we get through device attrs?
+ * For FMRs, is there a maximum mapping size? */
rds_ibdev->dev = device;
rds_ibdev->pd = ib_alloc_pd(device);
@@ -108,6 +116,10 @@ void rds_ib_add_one(struct ib_device *de
if (IS_ERR(rds_ibdev->mr))
goto err_pd;
+ /* Tell the RDMA code to use the fastreg API */
+ if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
+ rds_ibdev->use_fastreg = 1;
+
rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL;
Index: build-2.6/net/rds/ib.h
===================================================================
--- build-2.6.orig/net/rds/ib.h
+++ build-2.6/net/rds/ib.h
@@ -9,6 +9,8 @@
#define RDS_FMR_SIZE 256
#define RDS_FMR_POOL_SIZE 2048
+#define RDS_FASTREG_SIZE 20
+#define RDS_FASTREG_POOL_SIZE 2048
#define RDS_IB_MAX_SGE 8
#define RDS_IB_RECV_SGE 2
@@ -57,10 +59,37 @@ struct rds_ib_scatterlist {
unsigned int bytes;
};
+/* We need to post a LOCAL_INV request unless f_old_rkey
+ * has this value. */
+#define RDS_IB_INVALID_FASTREG_KEY 0
+
+struct rds_ib_fastreg {
+ atomic_t f_refcnt;
+ unsigned int f_posted : 1,
+ f_done : 1;
+
+ u32 f_old_rkey;
+
+ u32 f_rkey;
+ unsigned int f_length;
+
+ struct rds_ib_scatterlist f_sg;
+
+ struct ib_fast_reg_page_list *f_page_list;
+ unsigned int f_page_list_len;
+ unsigned int f_page_shift;
+
+ struct rds_ib_mr *f_mr;
+};
+
struct rds_ib_send_work {
struct rds_message *s_rm;
+
+ /* We should really put these into a union: */
struct rds_rdma_op *s_op;
+ struct rds_ib_fastreg *s_fastreg;
+
struct ib_send_wr s_wr;
struct ib_sge s_sge[RDS_IB_MAX_SGE];
unsigned long s_queued;
@@ -250,6 +279,8 @@ extern struct ib_client rds_ib_client;
extern unsigned int fmr_pool_size;
extern unsigned int fmr_message_size;
+extern unsigned int fastreg_pool_size;
+extern unsigned int fastreg_message_size;
/* ib_cm.c */
int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -274,6 +305,10 @@ void *rds_ib_get_mr(struct scatterlist *
void rds_ib_sync_mr(void *trans_private, int dir);
void rds_ib_free_mr(void *trans_private, int invalidate);
void rds_ib_flush_mrs(void);
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
/* ib_recv.c */
int __init rds_ib_recv_init(void);
@@ -312,6 +347,7 @@ void rds_ib_send_cq_comp_handler(struct
void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
Index: build-2.6/net/rds/ib_rdma.c
===================================================================
--- build-2.6.orig/net/rds/ib_rdma.c
+++ build-2.6/net/rds/ib_rdma.c
@@ -46,11 +46,18 @@ struct rds_ib_mr {
struct rds_ib_device *device;
struct rds_ib_mr_pool *pool;
+ spinlock_t lock;
union {
struct ib_fmr *fmr;
- /* fastreg stuff and maybe others go here */
+ struct {
+ struct ib_mr *mr;
+ struct ib_fast_reg_page_list *page_list;
+ u32 rkey;
+ struct rds_ib_fastreg *pending;
+ } fastreg;
} u;
struct list_head list;
+ unsigned int map_seq; /* corresponds to pool->flush_seq */
unsigned int remap_count;
struct rds_ib_scatterlist sg;
@@ -64,19 +71,27 @@ struct rds_ib_mr_pool {
struct mutex flush_lock; /* serialize fmr invalidate */
struct work_struct flush_worker; /* flush worker */
+ atomic_t flush_seq;
spinlock_t list_lock; /* protect variables below */
atomic_t item_count; /* total # of MRs */
atomic_t dirty_count; /* # dirty of MRs */
+ unsigned int can_recycle_dirty : 1;
struct list_head drop_list; /* MRs that have reached their max_maps limit */
struct list_head free_list; /* unused MRs */
struct list_head clean_list; /* unused & unamapped MRs */
+ struct list_head recycle_list; /* dirty recycled MRs */
+ struct list_head fastreg_list; /* pending fastreg's */
atomic_t free_pinned; /* memory pinned by free MRs */
+ unsigned long max_message_size; /* in pages */
unsigned long max_items;
unsigned long max_items_soft;
unsigned long max_free_pinned;
struct ib_fmr_attr fmr_attr;
+ /* Dummy QP used to handle invalidate for fastreg */
+ struct ib_qp *qp;
+
struct rds_ib_mr_pool_ops *op;
};
@@ -98,6 +113,12 @@ static int rds_ib_map_fmr(struct rds_ib_
struct scatterlist *sg, unsigned int nents);
static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
.init = rds_ib_init_fmr,
@@ -106,6 +127,12 @@ static struct rds_ib_mr_pool_ops rds_ib_
.destroy = rds_ib_destroy_fmr,
};
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+ .init = rds_ib_init_fastreg,
+ .map = rds_ib_map_fastreg,
+ .unmap = rds_ib_unmap_fastreg_list,
+ .destroy = rds_ib_destroy_fastreg,
+};
int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
{
@@ -276,28 +303,31 @@ out_unmap:
}
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+ unsigned int message_size, unsigned int pool_size,
+ struct rds_ib_mr_pool_ops *ops)
{
struct rds_ib_mr_pool *pool;
- /* For now, disable all RDMA service on iWARP. This check will
- * go away when we have a working patch. */
- if (rds_ibdev->dev->node_type == RDMA_NODE_RNIC)
- return NULL;
-
pool = kzalloc(sizeof(*pool), GFP_KERNEL);
if (!pool)
return ERR_PTR(-ENOMEM);
- pool->op = &rds_ib_fmr_pool_ops;
+ pool->op = ops;
pool->device = rds_ibdev;
INIT_LIST_HEAD(&pool->free_list);
INIT_LIST_HEAD(&pool->drop_list);
INIT_LIST_HEAD(&pool->clean_list);
+ INIT_LIST_HEAD(&pool->recycle_list);
+ INIT_LIST_HEAD(&pool->fastreg_list);
mutex_init(&pool->flush_lock);
spin_lock_init(&pool->list_lock);
INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
+ pool->max_message_size = message_size;
+ pool->max_items = pool_size;
+ pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
+
pool->fmr_attr.max_pages = fmr_message_size;
pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
@@ -308,8 +338,49 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
* items more aggressively.
* Make sure that max_items > max_items_soft > max_items / 2
*/
- pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
- pool->max_items = rds_ibdev->max_fmrs;
+ pool->max_items_soft = pool->max_items * 3 / 4;
+
+ return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+ struct rds_ib_mr_pool *pool;
+ unsigned int pool_size;
+
+ if (!rds_ibdev->use_fastreg) {
+ /* Use FMRs to implement memory registrations */
+ pool_size = fmr_pool_size;
+
+ if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+ pool_size = rds_ibdev->max_fmrs;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+ &rds_ib_fmr_pool_ops);
+
+ if (!IS_ERR(pool)) {
+ pool->fmr_attr.max_pages = pool->max_message_size;
+ pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+ pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+ }
+ } else {
+ /* Use fastregs to implement memory registrations */
+ pool_size = fastreg_pool_size;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev,
+ fastreg_message_size,
+ pool_size,
+ &rds_ib_fastreg_pool_ops);
+ pool->can_recycle_dirty = 1;
+
+ if (!IS_ERR(pool)) {
+ /* Fill in the blanks:
+ * create a dummy QP to which we can post LOCAL_INV
+ * requests when invalidating MRs
+ */
+ pool->qp = NULL;
+ }
+ }
return pool;
}
@@ -328,6 +399,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
rds_ib_flush_mr_pool(pool, 1);
BUG_ON(atomic_read(&pool->item_count));
BUG_ON(atomic_read(&pool->free_pinned));
+
+ if (pool->qp)
+ ib_destroy_qp(pool->qp);
+
kfree(pool);
}
@@ -341,6 +416,11 @@ static inline struct rds_ib_mr *rds_ib_r
ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
list_del_init(&ibmr->list);
}
+ if (!ibmr && pool->can_recycle_dirty
+ && !list_empty(&pool->free_list)) {
+ ibmr = list_entry(pool->dirty_list.next, struct rds_ib_mr, list);
+ list_move(&ibmr->list, &pool->recycle_list);
+ }
spin_unlock_irqrestore(&pool->list_lock, flags);
return ibmr;
@@ -386,6 +466,7 @@ static struct rds_ib_mr *rds_ib_alloc_fm
goto out_no_cigar;
}
+ spin_lock_init(&ibmr->lock);
err = pool->op->init(pool, ibmr);
if (err)
goto out_no_cigar;
@@ -430,6 +511,21 @@ void rds_ib_teardown_mr(struct rds_ib_mr
atomic_sub(pinned, &pool->free_pinned);
}
+
+ ibmr->map_seq = 0;
+ if (!list_empty(&ibmr->list)) {
+ struct rds_ib_mr_pool *pool = ibmr->pool;
+ unsigned long flags;
+
+ /* This MR was dirty and got recycled.
+ * Now we can remove it from the recycle list.
+ * If there was a pool flush in progress,
+ * optionally wake up the process waiting for
+ * this flush to complete */
+ spin_lock_irqsave(&pool->list_lock, flags);
+ list_del(&ibmr->list);
+ spin_lock_irqrestore(&pool->list_lock, flags);
+ }
}
static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int free_all)
@@ -470,6 +566,7 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
/* Get the list of all MRs to be dropped. Ordering matters -
* we want to put drop_list ahead of free_list. */
list_splice_init(&pool->free_list, &unmap_list);
+ list_splice_init(&pool->recycle_list, &unmap_list);
list_splice_init(&pool->drop_list, &unmap_list);
if (free_all)
list_splice_init(&pool->clean_list, &unmap_list);
@@ -527,12 +624,15 @@ void rds_ib_free_mr(void *trans_private,
if (!pool)
return;
- /* Return it to the pool's free list */
+ /* Return it to the pool's free list.
+ * The mr may be unlinked, or on the recycle list.
+ * Either way, list_move does the right thing.
+ */
spin_lock_irqsave(&pool->list_lock, flags);
if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
- list_add(&ibmr->list, &pool->drop_list);
+ list_move(&ibmr->list, &pool->drop_list);
} else {
- list_add(&ibmr->list, &pool->free_list);
+ list_move(&ibmr->list, &pool->free_list);
}
atomic_add(ibmr->sg.len, &pool->free_pinned);
atomic_inc(&pool->dirty_count);
@@ -691,3 +791,278 @@ static void rds_ib_destroy_fmr(struct rd
ibmr->u.fmr = NULL;
}
+/*
+ * iWARP fastreg handling
+ *
+ * The life cycle of a fastreg registration is a bit different from
+ * FMRs.
+ * The idea behind fastreg is to have one MR, to which we bind different
+ * mappings over time. To avoid stalling on the expensive map and invalidate
+ * operations, these operations are pipelined on the same send queue on
+ * which we want to send the message containing the r_key.
+ *
+ * This creates a bit of a problem for us, as we do not have the destination
+ * IP in GET_MR, so we don't know to which QP to queue these requests (we have
+ * the dest IP in CMSG_RDMA_MAP, though, so we could handle that case more
+ * smartly).
+ *
+ * The way we solve this problem is by creating a rds_ib_fastreg structure and
+ * attaching that to the MR. When the application sends a message and passes
+ * the R_key through CMSG_RDMA_DEST, we look up the referenced MR and check
+ * whether a fastreg request is present (rds_ib_rdma_get_fastreg).
+ *
+ * If a fastreg request is present, rds_ib_xmit will try to queue a LOCAL_INV
+ * (if needed) and a FAST_REG_MR work request before queuing the SEND.
+ * When completions for these arrive, they are dispatched to the RDMA code
+ * (rds_ib_local_inv_complete, rds_ib_fast_reg_complete).
+ *
+ * There is another interesting aspect that's related to invalidation.
+ * The application can request that a mapping is invalidated in FREE_MR.
+ * The expectation there is that this invalidation step includes ALL
+ * PREVIOUSLY FREED MRs.
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_mr *mr;
+
+ mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+ if (IS_ERR(mr)) {
+ int err = PTR_ERR(mr);
+
+ printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+ return err;
+ }
+
+ ibmr->u.fastreg.rkey = RDS_IB_INVALID_FASTREG_KEY;
+ ibmr->u.fastreg.mr = mr;
+ return 0;
+}
+
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int sg_len)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_fast_reg_page_list *page_list = NULL;
+ struct rds_ib_fastreg *frr;
+ u64 *dma_pages;
+ int i, ret;
+
+ BUG_ON(ibmr->u.fastreg.pending);
+
+ /* Allocate the fastreg request structure */
+ frr = kzalloc(sizeof(*frr), GFP_KERNEL);
+ if (!frr)
+ return -ENOMEM;
+
+ frr->f_mr = ibmr;
+ frr->f_page_shift = rds_ibdev->fmr_page_shift; /* XXX really? */
+ rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len);
+ atomic_set(&frr->f_refcnt, 1);
+
+ dma_pages = rds_ib_map_scatterlist(rds_ibdev, &frr->f_sg,
+ frr->f_page_shift);
+ if (IS_ERR(dma_pages)) {
+ ret = PTR_ERR(dma_pages);
+ goto out;
+ }
+
+ page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, frr->f_sg.dma_npages);
+ if (IS_ERR(page_list)) {
+ ret = PTR_ERR(page_list);
+ printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
+ goto out;
+ }
+ frr->f_page_list = page_list;
+ frr->f_page_list_len = frr->f_sg.dma_len;
+ frr->f_length = frr->f_sg.bytes;
+
+ for (i = 0; i < frr->f_sg.dma_npages; ++i)
+ page_list->page_list[i] = dma_pages[i];
+
+ ib_update_fast_reg_key(ibmr->u.fastreg.mr, ibmr->remap_count++);
+
+ frr->f_rkey = ibmr->u.fastreg.mr->rkey;
+ frr->f_old_rkey = ibmr->u.fastreg.rkey;
+
+ /* Attach the fastreg info to the MR */
+ ibmr->u.fastreg.pending = frr;
+
+ rds_ib_stats_inc(s_ib_rdma_mr_used);
+ ret = 0;
+
+out:
+ if (ret)
+ rds_ib_fastreg_release(frr);
+
+ return ret;
+}
+
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
+{
+ struct rds_ib_mr *ibmr = mr->r_trans_private;
+ struct rds_ib_fastreg *frr;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ibmr->lock, flags);
+ frr = ibmr->u.fastreg.pending;
+ if (frr) {
+ /* FIXME: we need to mark the frr as "locked"
+ * to prevent FREE_MR from trashing the MR
+ * as long as the fastreg is on the queue */
+ atomic_inc(&frr->f_refcnt);
+ }
+ spin_unlock_irqrestore(&ibmr->lock, flags);
+
+ return frr;
+}
+
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
+{
+ struct rds_ib_device *rds_ibdev = NULL;
+
+ if (atomic_dec_and_test(&frr->f_refcnt)) {
+ ib_free_fast_reg_page_list(frr->f_page_list);
+ rds_ibdev = frr->f_mr->device;
+ rds_ib_drop_scatterlist(rds_ibdev, &frr->f_sg);
+ kfree(frr);
+ }
+}
+
+/*
+ * These functions are called back from the send CQ handler
+ * when the LOCAL_INV or FAST_REG_MR WRs complete.
+ */
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
+{
+ struct rds_ib_mr *ibmr = frr->f_mr;
+
+ spin_lock(&ibmr->lock);
+ if (ibmr->u.fastreg.pending != frr)
+ goto out_unlock;
+
+ if (status != IB_WC_SUCCESS) {
+ /* Yikes. Invalidation failed. What can we do but complain? */
+ printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
+ goto out_unlock;
+ }
+
+ if (frr->f_old_rkey == ibmr->u.fastreg.rkey) {
+ ibmr->u.fastreg.rkey = 0;
+ /* Now we can unpin any memory pinned for this MR. */
+ rds_ib_teardown_mr(ibmr);
+ }
+ frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
+
+out_unlock:
+ spin_unlock(&ibmr->lock);
+
+ /* The WR owned a reference to this frr. Drop it */
+ rds_ib_fastreg_release(frr);
+}
+
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
+{
+ struct rds_ib_mr *ibmr = frr->f_mr;
+
+ spin_lock(&ibmr->lock);
+
+ /* Technically, this would be a bug */
+ if (ibmr->u.fastreg.pending != frr)
+ goto out_unlock;
+
+ if (status != IB_WC_SUCCESS) {
+ /* Yikes. We were unable to register the application's
+ * memory. We have no way of notifying the application.
+ * We could probably tear down the QP and cry uncle, but
+ * the SEND may already have gone out.
+ * The only solace is that the RDMA initiated by the remote
+ * will fail, because the key isn't valid.
+ */
+ if (printk_ratelimit())
+ printk(KERN_NOTICE "RDS/iWARP: Unable to "
+ "perform fast memory registration.\n");
+ goto out_unlock;
+ }
+
+ ibmr->sg = frr->f_sg;
+ ibmr->u.fastreg.page_list = frr->f_page_list;
+ ibmr->u.fastreg.rkey = frr->f_rkey;
+ ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
+
+ /* Detach frr from MR. We still have at least one ref after this */
+ ibmr->u.fastreg.pending = NULL;
+ rds_ib_fastreg_release(frr);
+ frr->f_done = 1;
+
+out_unlock:
+ spin_unlock(&ibmr->lock);
+
+ /* The WR owned a reference to this frr. Drop it */
+ rds_ib_fastreg_release(frr);
+}
+
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list)
+{
+ unsigned int flush_seq = atomic_read(&pool->flush_seq);
+ struct rds_ib_mr *ibmr, *next;
+ LIST_HEAD(laundered);
+ int ret;
+
+ /* Batched invalidation of fastreg MRs.
+ * Why do we do it this way, even though we could pipeline unmap
+ * and remap? The reason is the application semantics - when the
+ * application requests an invalidation of MRs, it expects all
+ * previously released R_Keys to become invalid.
+ *
+ * If we implement MR reuse naively, we risk memory corruption
+ * (this has actually been observed). So the default behavior
+ * requires that a MR goes through an explicit unmap operation before
+ * we can reuse it again.
+ *
+ * We could probably improve on this a little, by allowing immediate
+ * reuse of a MR on the same socket (eg you could add small
+ * cache of unused MRs to strct rds_socket - GET_MR could grab one
+ * of these without requiring an explicit invalidate).
+ */
+ while (!list_empty(unmap_list)) {
+ unsigned long flags;
+
+ /* We need to take the lock, because recycled MRs may
+ * ooze from the unmap list back to the clean list */
+ spin_lock_irqsave(&pool->list_lock, flags);
+ list_for_each_entry_safe(ibmr, next, unmap_list, list) {
+ if (ibmr->map_seq == 0
+ || ibmr->map_seq >= flush_seq)
+ list_move(&ibmr->list, &laundered);
+
+ /* Do we need to submit a WR to pool->qp? */
+ if (ibmr->fastreg.pending) {
+ /* FIXME: create a dummy fastreg that
+ * has f_old_rkey set, but no page_list.
+ }
+ spin_unlock_irqrestore(&pool->list_lock, flags);
+ }
+
+ /* Fill in the blanks:
+ Go through the list of dirty MRs, and post LOCAL_INV WRs to the
+ dummy pool->qp. When the completion for the last WR arrives,
+ the CQ handler wakes up the caller.
+ */
+ BUG(); /* not implemented yet. */
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ if (ibmr->u.fastreg.page_list)
+ ib_free_fast_reg_page_list(ibmr->u.fastreg.page_list);
+ if (ibmr->u.fastreg.mr)
+ ib_dereg_mr(ibmr->u.fastreg.mr);
+ if (ibmr->u.fastreg.pending)
+ rds_ib_fastreg_release(ibmr->u.fastreg.pending);
+}
+
Index: build-2.6/net/rds/ib_send.c
===================================================================
--- build-2.6.orig/net/rds/ib_send.c
+++ build-2.6/net/rds/ib_send.c
@@ -135,6 +135,7 @@ void rds_ib_send_init_ring(struct rds_ib
send->s_rm = NULL;
send->s_op = NULL;
+ send->s_fastreg = NULL;
send->s_wr.wr_id = i;
send->s_wr.sg_list = send->s_sge;
@@ -165,6 +166,8 @@ void rds_ib_send_clear_ring(struct rds_i
rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
if (send->s_op)
rds_ib_send_unmap_rdma(ic, send->s_op);
+ if (send->s_fastreg)
+ rds_ib_fastreg_release(send->s_fastreg);
}
}
@@ -223,6 +226,16 @@ void rds_ib_send_cq_comp_handler(struct
/* Nothing to be done - the SG list will be unmapped
* when the SEND completes. */
break;
+ case IB_WR_LOCAL_INV:
+ /* We invalidated an r_key. the caller may want to
+ * learn about this. */
+ if (send->s_fastreg)
+ rds_ib_local_inv_complete(send->s_fastreg, wc.status);
+ break;
+ case IB_WR_FAST_REG_MR:
+ if (send->s_fastreg)
+ rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
+ break;
default:
if (printk_ratelimit())
printk(KERN_NOTICE
@@ -484,6 +497,21 @@ int rds_ib_xmit(struct rds_connection *c
BUG_ON(off % RDS_FRAG_SIZE);
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
+ /* Fastreg support */
+ if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+ && ic->i_fastreg
+ && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
+ ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
+ if (ret)
+ goto out;
+
+ /* We don't release the fastreg yet - we can only
+ * do that when it has completed. If the connection
+ * goes down, and we re-queue the message, we would
+ * have to retry the registration. */
+ set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
+ }
+
/* FIXME we may overallocate here */
if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
i = 1;
@@ -849,6 +877,108 @@ out:
return ret;
}
+static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
+ struct rds_ib_fastreg *frr)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_send_work *send = NULL;
+ struct rds_ib_send_work *first;
+ struct ib_send_wr *failed_wr;
+ u32 pos;
+ u32 work_alloc = 0;
+ int ret;
+ int num_wrs;
+
+ /*
+ * Perform 2 WRs for the fast_reg_mr's and chain them together. The
+ * first WR is used to invalidate the old rkey, and the second WR is
+ * used to define the new fast_reg_mr request. Each individual page
+ * in the sg list is added to the fast reg page list and placed
+ * inside the fast_reg_mr WR. The key used is a rolling 8bit
+ * counter, which should guarantee uniqueness.
+ */
+ num_wrs = 0;
+ if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
+ num_wrs++;
+ if (frr->f_page_list)
+ num_wrs++;
+ if (!num_wrs)
+ return 0;
+
+ work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
+ if (work_alloc != num_wrs) {
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_stats_inc(s_ib_tx_ring_full);
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ first = send = &ic->i_sends[pos];
+
+ if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
+ memset(send, 0, sizeof(*send));
+ send->s_wr.opcode = IB_WR_LOCAL_INV;
+ send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
+ send->s_fastreg = frr;
+ send->s_queued = jiffies;
+
+ /* Get the next WR */
+ pos = (pos + 1) % ic->i_send_ring.w_nr;
+ send = &ic->i_sends[pos];
+ }
+
+ if (frr->f_page_list) {
+ memset(send, 0, sizeof(*send));
+ send->s_wr.opcode = IB_WR_FAST_REG_MR;
+ send->s_wr.wr.fast_reg.length = frr->f_length;
+ send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
+ send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
+ send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
+ send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
+ send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE;
+ send->s_fastreg = frr;
+ send->s_queued = jiffies;
+ }
+
+ atomic_add(num_wrs, &frr->f_refcnt);
+
+ /* Chain the two WRs together */
+ if (num_wrs == 2)
+ first->s_wr.next = &send->s_wr;
+
+ failed_wr = &first->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+
+ rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+ first, &first->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &first->s_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+ while (num_wrs--)
+ rds_ib_fastreg_release(frr);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+ return ret;
+ }
+
+out:
+ return ret;
+}
+
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
+{
+ struct rds_ib_fastreg *frr;
+
+ frr = rds_ib_rdma_get_fastreg(mr);
+ if (!frr)
+ return 0;
+ if (IS_ERR(frr))
+ return PTR_ERR(frr);
+ return __rds_ib_xmit_fastreg(conn, frr);
+}
+
void rds_ib_xmit_complete(struct rds_connection *conn)
{
struct rds_ib_connection *ic = conn->c_transport_data;
Index: build-2.6/net/rds/rds.h
===================================================================
--- build-2.6.orig/net/rds/rds.h
+++ build-2.6/net/rds/rds.h
@@ -278,6 +278,7 @@ struct rds_incoming {
#define RDS_MSG_RETRANSMITTED 5
#define RDS_MSG_MAPPED 6
#define RDS_MSG_PAGEVEC 7
+#define RDS_MSG_FASTREG_POSTED 8
struct rds_message {
atomic_t m_refcount;
Index: build-2.6/net/rds/send.c
===================================================================
--- build-2.6.orig/net/rds/send.c
+++ build-2.6/net/rds/send.c
@@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
+ /* If we were in the process of performing a fastreg
+ * memory registration when the connection went down,
+ * we have to retry it. */
+ clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
}
list_splice_init(&conn->c_retrans, &conn->c_send_queue);
spin_unlock_irqrestore(&conn->c_lock, flags);
@@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
if (cmsg->cmsg_level != SOL_RDS)
continue;
+ /* As a side effect, RDMA_DEST and RDMA_MAP will set
+ * rm->m_rdma_cookie and rm->m_rdma_mr.
+ */
switch (cmsg->cmsg_type) {
case RDS_CMSG_RDMA_ARGS:
ret = rds_cmsg_rdma_args(rs, rm, cmsg);
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general