Jon,

please check out
git://git.openfabrics.org/~okir/ofed_1_4/linux-2.6.git code-drop-20080703

It has the bcopy changes, and some of the plumbing for RDMA. The
remaining bits aren't complete yet. I'm attaching a patch that contains
my current working state - this doesn't work, and probably doesn't even
compile, but it's for your review.

Olaf
-- 
Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
[EMAIL PROTECTED] |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax
---
 net/rds/ib.c      |   18 ++
 net/rds/ib.h      |   36 ++++
 net/rds/ib_rdma.c |  401 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 net/rds/ib_send.c |  130 +++++++++++++++++
 net/rds/rds.h     |    1 
 net/rds/send.c    |    7 
 6 files changed, 577 insertions(+), 16 deletions(-)

Index: build-2.6/net/rds/ib.c
===================================================================
--- build-2.6.orig/net/rds/ib.c
+++ build-2.6/net/rds/ib.c
@@ -42,11 +42,17 @@
 
 unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
 unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
+unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
 
 module_param(fmr_pool_size, int, 0444);
 MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
 module_param(fmr_message_size, int, 0444);
 MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer");
+module_param(fastreg_pool_size, int, 0444);
+MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
+module_param(fastreg_message_size, int, 0444);
+MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
 
 struct list_head rds_ib_devices;
 
@@ -86,9 +92,11 @@ void rds_ib_add_one(struct ib_device *de
 	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
 	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
 	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
-	rds_ibdev->max_fmrs = dev_attr->max_fmr?
-			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
-			fmr_pool_size;
+	rds_ibdev->max_fmrs = dev_attr->max_fmr;
+
+	/* FIXME: is there a maximum number of fastreg mappings and
+	 * a maximum mapping size that we get through device attrs?
+	 * For FMRs, is there a maximum mapping size? */
 
 	rds_ibdev->dev = device;
 	rds_ibdev->pd = ib_alloc_pd(device);
@@ -108,6 +116,10 @@ void rds_ib_add_one(struct ib_device *de
 	if (IS_ERR(rds_ibdev->mr))
 		goto err_pd;
 
+	/* Tell the RDMA code to use the fastreg API */
+	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)
+		rds_ibdev->use_fastreg = 1;
+
 	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
 	if (IS_ERR(rds_ibdev->mr_pool)) {
 		rds_ibdev->mr_pool = NULL;
Index: build-2.6/net/rds/ib.h
===================================================================
--- build-2.6.orig/net/rds/ib.h
+++ build-2.6/net/rds/ib.h
@@ -9,6 +9,8 @@
 
 #define RDS_FMR_SIZE			256
 #define RDS_FMR_POOL_SIZE		2048
+#define RDS_FASTREG_SIZE		20
+#define RDS_FASTREG_POOL_SIZE		2048
 
 #define RDS_IB_MAX_SGE			8
 #define RDS_IB_RECV_SGE 		2
@@ -57,10 +59,37 @@ struct rds_ib_scatterlist {
 	unsigned int		bytes;
 };
 
+/* We need to post a LOCAL_INV request unless f_old_rkey
+ * has this value. */
+#define RDS_IB_INVALID_FASTREG_KEY 0
+
+struct rds_ib_fastreg {
+	atomic_t		f_refcnt;
+	unsigned int		f_posted : 1,
+				f_done : 1;
+
+	u32			f_old_rkey;
+
+	u32			f_rkey;
+	unsigned int		f_length;
+
+	struct rds_ib_scatterlist f_sg;
+
+	struct ib_fast_reg_page_list *f_page_list;
+	unsigned int		f_page_list_len;
+	unsigned int		f_page_shift;
+
+	struct rds_ib_mr	*f_mr;
+};
+
 
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
+
+	/* We should really put these into a union: */
 	struct rds_rdma_op	*s_op;
+	struct rds_ib_fastreg	*s_fastreg;
+
 	struct ib_send_wr	s_wr;
 	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 	unsigned long		s_queued;
@@ -250,6 +279,8 @@ extern struct ib_client rds_ib_client;
 
 extern unsigned int fmr_pool_size;
 extern unsigned int fmr_message_size;
+extern unsigned int fastreg_pool_size;
+extern unsigned int fastreg_message_size;
 
 /* ib_cm.c */
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
@@ -274,6 +305,10 @@ void *rds_ib_get_mr(struct scatterlist *
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
 
 /* ib_recv.c */
 int __init rds_ib_recv_init(void);
@@ -312,6 +347,7 @@ void rds_ib_send_cq_comp_handler(struct 
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
 void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
Index: build-2.6/net/rds/ib_rdma.c
===================================================================
--- build-2.6.orig/net/rds/ib_rdma.c
+++ build-2.6/net/rds/ib_rdma.c
@@ -46,11 +46,18 @@ struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
 
+	spinlock_t		lock;
 	union {
 	    struct ib_fmr	*fmr;
-	    /* fastreg stuff and maybe others go here */
+	    struct {
+		struct ib_mr	*mr;
+		struct ib_fast_reg_page_list *page_list;
+		u32		rkey;
+		struct rds_ib_fastreg *pending;
+	    } fastreg;
 	} u;
 	struct list_head	list;
+	unsigned int		map_seq;		/* corresponds to pool->flush_seq */
 	unsigned int		remap_count;
 
 	struct rds_ib_scatterlist sg;
@@ -64,19 +71,27 @@ struct rds_ib_mr_pool {
 
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct work_struct	flush_worker;		/* flush worker */
+	atomic_t		flush_seq;
 
 	spinlock_t		list_lock;		/* protect variables below */
 	atomic_t		item_count;		/* total # of MRs */
 	atomic_t		dirty_count;		/* # dirty of MRs */
+	unsigned int		can_recycle_dirty : 1;
 	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
 	struct list_head	free_list;		/* unused MRs */
 	struct list_head	clean_list;		/* unused & unamapped MRs */
+	struct list_head	recycle_list;		/* dirty recycled MRs */
+	struct list_head	fastreg_list;		/* pending fastreg's */
 	atomic_t		free_pinned;		/* memory pinned by free MRs */
+	unsigned long		max_message_size;	/* in pages */
 	unsigned long		max_items;
 	unsigned long		max_items_soft;
 	unsigned long		max_free_pinned;
 	struct ib_fmr_attr	fmr_attr;
 
+	/* Dummy QP used to handle invalidate for fastreg */
+	struct ib_qp		*qp;
+
 	struct rds_ib_mr_pool_ops *op;
 };
 
@@ -98,6 +113,12 @@ static int rds_ib_map_fmr(struct rds_ib_
 			  struct scatterlist *sg, unsigned int nents);
 static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
 static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
 
 static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
 	.init		= rds_ib_init_fmr,
@@ -106,6 +127,12 @@ static struct rds_ib_mr_pool_ops rds_ib_
 	.destroy	= rds_ib_destroy_fmr,
 };
 
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+	.init		= rds_ib_init_fastreg,
+	.map		= rds_ib_map_fastreg,
+	.unmap		= rds_ib_unmap_fastreg_list,
+	.destroy	= rds_ib_destroy_fastreg,
+};
 
 int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 {
@@ -276,28 +303,31 @@ out_unmap:
 }
 
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+		unsigned int message_size, unsigned int pool_size,
+		struct rds_ib_mr_pool_ops *ops)
 {
 	struct rds_ib_mr_pool *pool;
 
-	/* For now, disable all RDMA service on iWARP. This check will
-	 * go away when we have a working patch. */
-	if (rds_ibdev->dev->node_type == RDMA_NODE_RNIC)
-		return NULL;
-
 	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
-	pool->op = &rds_ib_fmr_pool_ops;
+	pool->op = ops;
 	pool->device = rds_ibdev;
 	INIT_LIST_HEAD(&pool->free_list);
 	INIT_LIST_HEAD(&pool->drop_list);
 	INIT_LIST_HEAD(&pool->clean_list);
+	INIT_LIST_HEAD(&pool->recycle_list);
+	INIT_LIST_HEAD(&pool->fastreg_list);
 	mutex_init(&pool->flush_lock);
 	spin_lock_init(&pool->list_lock);
 	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
+	pool->max_message_size = message_size;
+	pool->max_items = pool_size;
+	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
+
 	pool->fmr_attr.max_pages = fmr_message_size;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
@@ -308,8 +338,49 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
 	 * items more aggressively.
 	 * Make sure that max_items > max_items_soft > max_items / 2
 	 */
-	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
+	pool->max_items_soft = pool->max_items * 3 / 4;
+
+	return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+	struct rds_ib_mr_pool *pool;
+	unsigned int pool_size;
+
+	if (!rds_ibdev->use_fastreg) {
+		/* Use FMRs to implement memory registrations */
+		pool_size = fmr_pool_size;
+
+		if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+			pool_size = rds_ibdev->max_fmrs;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+					&rds_ib_fmr_pool_ops);
+
+		if (!IS_ERR(pool)) {
+			pool->fmr_attr.max_pages = pool->max_message_size;
+			pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+			pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+		}
+	} else {
+		/* Use fastregs to implement memory registrations */
+		pool_size = fastreg_pool_size;
+
+		pool = __rds_ib_create_mr_pool(rds_ibdev,
+					fastreg_message_size,
+					pool_size,
+					&rds_ib_fastreg_pool_ops);
+		pool->can_recycle_dirty = 1;
+
+		if (!IS_ERR(pool)) {
+			/* Fill in the blanks:
+			 *  create a dummy QP to which we can post LOCAL_INV
+			 *  requests when invalidating MRs
+			 */
+			pool->qp = NULL;
+		}
+	}
 
 	return pool;
 }
@@ -328,6 +399,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
 	rds_ib_flush_mr_pool(pool, 1);
 	BUG_ON(atomic_read(&pool->item_count));
 	BUG_ON(atomic_read(&pool->free_pinned));
+
+	if (pool->qp)
+		ib_destroy_qp(pool->qp);
+
 	kfree(pool);
 }
 
@@ -341,6 +416,11 @@ static inline struct rds_ib_mr *rds_ib_r
 		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
 		list_del_init(&ibmr->list);
 	}
+	if (!ibmr && pool->can_recycle_dirty
+	 && !list_empty(&pool->free_list)) {
+		ibmr = list_entry(pool->dirty_list.next, struct rds_ib_mr, list);
+		list_move(&ibmr->list, &pool->recycle_list);
+	}
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
 	return ibmr;
@@ -386,6 +466,7 @@ static struct rds_ib_mr *rds_ib_alloc_fm
 		goto out_no_cigar;
 	}
 
+	spin_lock_init(&ibmr->lock);
 	err = pool->op->init(pool, ibmr);
 	if (err)
 		goto out_no_cigar;
@@ -430,6 +511,21 @@ void rds_ib_teardown_mr(struct rds_ib_mr
 
 		atomic_sub(pinned, &pool->free_pinned);
 	}
+
+	ibmr->map_seq = 0;
+	if (!list_empty(&ibmr->list)) {
+		struct rds_ib_mr_pool *pool = ibmr->pool;
+		unsigned long flags;
+
+		/* This MR was dirty and got recycled.
+		 * Now we can remove it from the recycle list.
+		 * If there was a pool flush in progress,
+		 * optionally wake up the process waiting for
+		 * this flush to complete */
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_del(&ibmr->list);
+		spin_lock_irqrestore(&pool->list_lock, flags);
+	}
 }
 
 static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int free_all)
@@ -470,6 +566,7 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
 	/* Get the list of all MRs to be dropped. Ordering matters -
 	 * we want to put drop_list ahead of free_list. */
 	list_splice_init(&pool->free_list, &unmap_list);
+	list_splice_init(&pool->recycle_list, &unmap_list);
 	list_splice_init(&pool->drop_list, &unmap_list);
 	if (free_all)
 		list_splice_init(&pool->clean_list, &unmap_list);
@@ -527,12 +624,15 @@ void rds_ib_free_mr(void *trans_private,
 	if (!pool)
 		return;
 
-	/* Return it to the pool's free list */
+	/* Return it to the pool's free list.
+	 * The mr may be unlinked, or on the recycle list.
+	 * Either way, list_move does the right thing.
+	 */
 	spin_lock_irqsave(&pool->list_lock, flags);
 	if (ibmr->remap_count >= pool->fmr_attr.max_maps) {
-		list_add(&ibmr->list, &pool->drop_list);
+		list_move(&ibmr->list, &pool->drop_list);
 	} else {
-		list_add(&ibmr->list, &pool->free_list);
+		list_move(&ibmr->list, &pool->free_list);
 	}
 	atomic_add(ibmr->sg.len, &pool->free_pinned);
 	atomic_inc(&pool->dirty_count);
@@ -691,3 +791,278 @@ static void rds_ib_destroy_fmr(struct rd
 	ibmr->u.fmr = NULL;
 }
 
+/*
+ * iWARP fastreg handling
+ *
+ * The life cycle of a fastreg registration is a bit different from
+ * FMRs.
+ * The idea behind fastreg is to have one MR, to which we bind different
+ * mappings over time. To avoid stalling on the expensive map and invalidate
+ * operations, these operations are pipelined on the same send queue on
+ * which we want to send the message containing the r_key.
+ *
+ * This creates a bit of a problem for us, as we do not have the destination
+ * IP in GET_MR, so we don't know to which QP to queue these requests (we have
+ * the dest IP in CMSG_RDMA_MAP, though, so we could handle that case more
+ * smartly).
+ *
+ * The way we solve this problem is by creating a rds_ib_fastreg structure and
+ * attaching that to the MR. When the application sends a message and passes
+ * the R_key through CMSG_RDMA_DEST, we look up the referenced MR and check
+ * whether a fastreg request is present (rds_ib_rdma_get_fastreg).
+ *
+ * If a fastreg request is present, rds_ib_xmit will try to queue a LOCAL_INV
+ * (if needed) and a FAST_REG_MR work request before queuing the SEND.
+ * When completions for these arrive, they are dispatched to the RDMA code
+ * (rds_ib_local_inv_complete, rds_ib_fast_reg_complete).
+ *
+ * There is another interesting aspect that's related to invalidation.
+ * The application can request that a mapping is invalidated in FREE_MR.
+ * The expectation there is that this invalidation step includes ALL
+ * PREVIOUSLY FREED MRs.
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_mr *mr;
+
+	mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+	if (IS_ERR(mr)) {
+		int err = PTR_ERR(mr);
+
+		printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+		return err;
+	}
+
+	ibmr->u.fastreg.rkey = RDS_IB_INVALID_FASTREG_KEY;
+	ibmr->u.fastreg.mr = mr;
+	return 0;
+}
+
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr,
+	       		struct scatterlist *sg, unsigned int sg_len)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_fast_reg_page_list *page_list = NULL;
+	struct rds_ib_fastreg *frr;
+	u64 *dma_pages;
+	int i, ret;
+
+	BUG_ON(ibmr->u.fastreg.pending);
+
+	/* Allocate the fastreg request structure */
+	frr = kzalloc(sizeof(*frr), GFP_KERNEL);
+	if (!frr)
+		return -ENOMEM;
+
+	frr->f_mr = ibmr;
+	frr->f_page_shift = rds_ibdev->fmr_page_shift;	/* XXX really? */
+	rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len);
+	atomic_set(&frr->f_refcnt, 1);
+
+	dma_pages = rds_ib_map_scatterlist(rds_ibdev, &frr->f_sg,
+				frr->f_page_shift);
+	if (IS_ERR(dma_pages)) {
+		ret = PTR_ERR(dma_pages);
+		goto out;
+	}
+
+	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, frr->f_sg.dma_npages);
+	if (IS_ERR(page_list)) {
+		ret = PTR_ERR(page_list);
+		printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
+		goto out;
+	}
+	frr->f_page_list = page_list;
+	frr->f_page_list_len = frr->f_sg.dma_len;
+	frr->f_length = frr->f_sg.bytes;
+
+	for (i = 0; i < frr->f_sg.dma_npages; ++i)
+		page_list->page_list[i] = dma_pages[i];
+
+	ib_update_fast_reg_key(ibmr->u.fastreg.mr, ibmr->remap_count++);
+
+	frr->f_rkey = ibmr->u.fastreg.mr->rkey;
+	frr->f_old_rkey = ibmr->u.fastreg.rkey;
+
+	/* Attach the fastreg info to the MR */
+	ibmr->u.fastreg.pending = frr;
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	if (ret)
+		rds_ib_fastreg_release(frr);
+
+	return ret;
+}
+
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
+{
+	struct rds_ib_mr *ibmr = mr->r_trans_private;
+	struct rds_ib_fastreg *frr;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ibmr->lock, flags);
+	frr = ibmr->u.fastreg.pending;
+	if (frr) {
+		/* FIXME: we need to mark the frr as "locked"
+		 * to prevent FREE_MR from trashing the MR
+		 * as long as the fastreg is on the queue */
+		atomic_inc(&frr->f_refcnt);
+	}
+	spin_unlock_irqrestore(&ibmr->lock, flags);
+
+	return frr;
+}
+
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
+{
+	struct rds_ib_device *rds_ibdev = NULL;
+
+	if (atomic_dec_and_test(&frr->f_refcnt)) {
+		ib_free_fast_reg_page_list(frr->f_page_list);
+		rds_ibdev = frr->f_mr->device;
+		rds_ib_drop_scatterlist(rds_ibdev, &frr->f_sg);
+		kfree(frr);
+	}
+}
+
+/*
+ * These functions are called back from the send CQ handler
+ * when the LOCAL_INV or FAST_REG_MR WRs complete.
+ */
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
+{
+	struct rds_ib_mr *ibmr = frr->f_mr;
+
+	spin_lock(&ibmr->lock);
+	if (ibmr->u.fastreg.pending != frr)
+		goto out_unlock;
+
+	if (status != IB_WC_SUCCESS) {
+		/* Yikes. Invalidation failed. What can we do but complain? */
+		printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
+		goto out_unlock;
+	}
+
+	if (frr->f_old_rkey == ibmr->u.fastreg.rkey) {
+		ibmr->u.fastreg.rkey = 0;
+		/* Now we can unpin any memory pinned for this MR. */
+		rds_ib_teardown_mr(ibmr);
+	}
+	frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
+
+out_unlock:
+	spin_unlock(&ibmr->lock);
+
+	/* The WR owned a reference to this frr. Drop it */
+	rds_ib_fastreg_release(frr);
+}
+
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
+{
+	struct rds_ib_mr *ibmr = frr->f_mr;
+
+	spin_lock(&ibmr->lock);
+
+	/* Technically, this would be a bug */
+	if (ibmr->u.fastreg.pending != frr)
+		goto out_unlock;
+
+	if (status != IB_WC_SUCCESS) {
+		/* Yikes. We were unable to register the application's
+		 * memory. We have no way of notifying the application.
+		 * We could probably tear down the QP and cry uncle, but
+		 * the SEND may already have gone out.
+		 * The only solace is that the RDMA initiated by the remote
+		 * will fail, because the key isn't valid.
+		 */
+		if (printk_ratelimit())
+			printk(KERN_NOTICE "RDS/iWARP: Unable to "
+					"perform fast memory registration.\n");
+		goto out_unlock;
+	}
+
+	ibmr->sg = frr->f_sg;
+	ibmr->u.fastreg.page_list = frr->f_page_list;
+	ibmr->u.fastreg.rkey = frr->f_rkey;
+	ibmr->map_seq = atomic_read(&ibmr->pool->flush_seq);
+
+	/* Detach frr from MR. We still have at least one ref after this */
+	ibmr->u.fastreg.pending = NULL;
+	rds_ib_fastreg_release(frr);
+	frr->f_done = 1;
+
+out_unlock:
+	spin_unlock(&ibmr->lock);
+
+	/* The WR owned a reference to this frr. Drop it */
+	rds_ib_fastreg_release(frr);
+}
+
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+				struct list_head *unmap_list)
+{
+	unsigned int flush_seq = atomic_read(&pool->flush_seq);
+	struct rds_ib_mr *ibmr, *next;
+	LIST_HEAD(laundered);
+	int ret;
+
+	/* Batched invalidation of fastreg MRs.
+	 * Why do we do it this way, even though we could pipeline unmap
+	 * and remap? The reason is the application semantics - when the
+	 * application requests an invalidation of MRs, it expects all
+	 * previously released R_Keys to become invalid.
+	 *
+	 * If we implement MR reuse naively, we risk memory corruption
+	 * (this has actually been observed). So the default behavior
+	 * requires that a MR goes through an explicit unmap operation before
+	 * we can reuse it again.
+	 *
+	 * We could probably improve on this a little, by allowing immediate
+	 * reuse of a MR on the same socket (eg you could add small
+	 * cache of unused MRs to strct rds_socket - GET_MR could grab one
+	 * of these without requiring an explicit invalidate).
+	 */
+	while (!list_empty(unmap_list)) {
+		unsigned long flags;
+
+		/* We need to take the lock, because recycled MRs may
+		 * ooze from the unmap list back to the clean list */
+		spin_lock_irqsave(&pool->list_lock, flags);
+		list_for_each_entry_safe(ibmr, next, unmap_list, list) {
+			if (ibmr->map_seq == 0
+			 || ibmr->map_seq >= flush_seq)
+				list_move(&ibmr->list, &laundered);
+
+			/* Do we need to submit a WR to pool->qp? */
+			if (ibmr->fastreg.pending) {
+				/* FIXME: create a dummy fastreg that
+				 * has f_old_rkey set, but no page_list.
+		}
+		spin_unlock_irqrestore(&pool->list_lock, flags);
+	}
+
+	/* Fill in the blanks:
+	    Go through the list of dirty MRs, and post LOCAL_INV WRs to the
+	    dummy pool->qp. When the completion for the last WR arrives,
+	    the CQ handler wakes up the caller.
+	  */
+	BUG(); /* not implemented yet. */
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.fastreg.page_list)
+		ib_free_fast_reg_page_list(ibmr->u.fastreg.page_list);
+	if (ibmr->u.fastreg.mr)
+		ib_dereg_mr(ibmr->u.fastreg.mr);
+	if (ibmr->u.fastreg.pending)
+		rds_ib_fastreg_release(ibmr->u.fastreg.pending);
+}
+
Index: build-2.6/net/rds/ib_send.c
===================================================================
--- build-2.6.orig/net/rds/ib_send.c
+++ build-2.6/net/rds/ib_send.c
@@ -135,6 +135,7 @@ void rds_ib_send_init_ring(struct rds_ib
 
 		send->s_rm = NULL;
 		send->s_op = NULL;
+		send->s_fastreg = NULL;
 
 		send->s_wr.wr_id = i;
 		send->s_wr.sg_list = send->s_sge;
@@ -165,6 +166,8 @@ void rds_ib_send_clear_ring(struct rds_i
 			rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
 		if (send->s_op)
 			rds_ib_send_unmap_rdma(ic, send->s_op);
+		if (send->s_fastreg)
+			rds_ib_fastreg_release(send->s_fastreg);
 	}
 }
 
@@ -223,6 +226,16 @@ void rds_ib_send_cq_comp_handler(struct 
 				/* Nothing to be done - the SG list will be unmapped
 				 * when the SEND completes. */
 				break;
+			case IB_WR_LOCAL_INV:
+				/* We invalidated an r_key. the caller may want to
+				 * learn about this. */
+				if (send->s_fastreg)
+					rds_ib_local_inv_complete(send->s_fastreg, wc.status);
+				break;
+			case IB_WR_FAST_REG_MR:
+				if (send->s_fastreg)
+					rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
+				break;
 			default:
 				if (printk_ratelimit())
 					printk(KERN_NOTICE
@@ -484,6 +497,21 @@ int rds_ib_xmit(struct rds_connection *c
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
 
+	/* Fastreg support */
+	if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+	 && ic->i_fastreg
+	 && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
+		ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
+		if (ret)
+			goto out;
+
+		/* We don't release the fastreg yet - we can only
+		 * do that when it has completed. If the connection
+		 * goes down, and we re-queue the message, we would
+		 * have to retry the registration. */
+		set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
+	}
+
 	/* FIXME we may overallocate here */
 	if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
 		i = 1;
@@ -849,6 +877,108 @@ out:
 	return ret;
 }
 
+static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
+				 struct rds_ib_fastreg *frr)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_ib_send_work *send = NULL;
+	struct rds_ib_send_work *first;
+	struct ib_send_wr *failed_wr;
+	u32 pos;
+	u32 work_alloc = 0;
+	int ret;
+	int num_wrs;
+
+	/*
+	 * Perform 2 WRs for the fast_reg_mr's and chain them together.  The
+	 * first WR is used to invalidate the old rkey, and the second WR is
+	 * used to define the new fast_reg_mr request.  Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
+	 * counter, which should guarantee uniqueness.
+	 */
+	num_wrs = 0;
+	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
+		num_wrs++;
+	if (frr->f_page_list)
+		num_wrs++;
+	if (!num_wrs)
+		return 0;
+
+	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
+	if (work_alloc != num_wrs) {
+		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_stats_inc(s_ib_tx_ring_full);
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	first = send = &ic->i_sends[pos];
+
+	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
+		memset(send, 0, sizeof(*send));
+		send->s_wr.opcode = IB_WR_LOCAL_INV;
+		send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
+		send->s_fastreg = frr;
+		send->s_queued = jiffies;
+
+		/* Get the next WR */
+		pos = (pos + 1) % ic->i_send_ring.w_nr;
+		send = &ic->i_sends[pos];
+	}
+
+	if (frr->f_page_list) {
+		memset(send, 0, sizeof(*send));
+		send->s_wr.opcode = IB_WR_FAST_REG_MR;
+		send->s_wr.wr.fast_reg.length = frr->f_length;
+		send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
+		send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
+		send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
+		send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
+		send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+					IB_ACCESS_REMOTE_READ |
+					IB_ACCESS_REMOTE_WRITE;
+		send->s_fastreg = frr;
+		send->s_queued = jiffies;
+	}
+
+	atomic_add(num_wrs, &frr->f_refcnt);
+
+	/* Chain the two WRs together */
+	if (num_wrs == 2)
+		first->s_wr.next = &send->s_wr;
+
+	failed_wr = &first->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+
+	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+		 first, &first->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &first->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		while (num_wrs--)
+			rds_ib_fastreg_release(frr);
+		rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+		return ret;
+	}
+
+out:
+	return ret;
+}
+
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
+{
+	struct rds_ib_fastreg *frr;
+
+	frr = rds_ib_rdma_get_fastreg(mr);
+	if (!frr)
+		return 0;
+	if (IS_ERR(frr))
+		return PTR_ERR(frr);
+	return __rds_ib_xmit_fastreg(conn, frr);
+}
+
 void rds_ib_xmit_complete(struct rds_connection *conn)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
Index: build-2.6/net/rds/rds.h
===================================================================
--- build-2.6.orig/net/rds/rds.h
+++ build-2.6/net/rds/rds.h
@@ -278,6 +278,7 @@ struct rds_incoming {
 #define RDS_MSG_RETRANSMITTED	5
 #define RDS_MSG_MAPPED		6
 #define RDS_MSG_PAGEVEC		7
+#define RDS_MSG_FASTREG_POSTED	8
 
 struct rds_message {
 	atomic_t		m_refcount;
Index: build-2.6/net/rds/send.c
===================================================================
--- build-2.6.orig/net/rds/send.c
+++ build-2.6/net/rds/send.c
@@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
 	list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
 		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
+		/* If we were in the process of performing a fastreg
+		 * memory registration when the connection went down,
+		 * we have to retry it. */
+		clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
 	}
 	list_splice_init(&conn->c_retrans, &conn->c_send_queue);
 	spin_unlock_irqrestore(&conn->c_lock, flags);
@@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
 		if (cmsg->cmsg_level != SOL_RDS)
 			continue;
 
+		/* As a side effect, RDMA_DEST and RDMA_MAP will set
+		 * rm->m_rdma_cookie and rm->m_rdma_mr.
+		 */
 		switch (cmsg->cmsg_type) {
 		case RDS_CMSG_RDMA_ARGS:
 			ret = rds_cmsg_rdma_args(rs, rm, cmsg);
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to