On Thursday 03 July 2008 23:34:12 Jon Mason wrote:
> This patch adds support for running RDS over iWARP adapters.  It
Hi Jon,

I took your patch and tried to isolate the iWARP specific changes
in bcopy mode, and roll them into a smaller patch that doesn't duplicate
all the ib*.[hc] files.

I also tried to come to some working solution for RDMA - as you can
see from the deluge of messages I wrote on this :-) the approach you
chose has some problems.

Please take a look at the attached patch and let me know whether
(a) bcopy mode works, and (b) if the rdma approach may work with
iwarp nics.

Olaf

-- 
Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
[EMAIL PROTECTED] |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax
From: Olaf Kirch <[EMAIL PROTECTED]>
Subject: [PATCH RFC] RDS: Add iWARP Support

This is based on the work posted by Jon Mason. It extracts
the iWARP-specific changes that are needed to support bcopy
mode (I hope I caught all of them).

I also did some work on RDMA support. This is a lot harder,
because the interface and implementation were designed with
classic MRs in mind. However, I think the approach taken below
may result in a working approach (it's not working yet - I left
some blanks and BUG() asserts in there, because I wanted to get this
patch out as a RFC sooner rather than later).

Also, this is a pretty large patch - it needs to be broken down into
half a dozen or so smaller functional changes for better review.

Olaf
---
 net/rds/ib.c      |   30 ++
 net/rds/ib.h      |   55 ++++
 net/rds/ib_cm.c   |   36 ++-
 net/rds/ib_rdma.c |  610 +++++++++++++++++++++++++++++++++++++++++++++---------
 net/rds/ib_recv.c |    2 
 net/rds/ib_send.c |  133 +++++++++++
 net/rds/message.c |    2 
 net/rds/rdma.c    |   17 -
 net/rds/rdma.h    |    7 
 net/rds/rds.h     |    4 
 net/rds/send.c    |    7 
 11 files changed, 778 insertions(+), 125 deletions(-)

Index: build-2.6/net/rds/ib.c
===================================================================
--- build-2.6.orig/net/rds/ib.c
+++ build-2.6/net/rds/ib.c
@@ -42,6 +42,7 @@
 
 unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
 unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FMR_POOL_SIZE;
 
 module_param(fmr_pool_size, int, 0444);
 MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
@@ -85,21 +86,38 @@ void rds_ib_add_one(struct ib_device *de
 	rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
 	rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
 	rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
-	rds_ibdev->max_fmrs = dev_attr->max_fmr?
-			min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
-			fmr_pool_size;
+	rds_ibdev->max_fmrs = dev_attr->max_fmr;
 
 	rds_ibdev->dev = device;
 	rds_ibdev->pd = ib_alloc_pd(device);
 	if (IS_ERR(rds_ibdev->pd))
 		goto free_dev;
 
-	rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
-				      IB_ACCESS_LOCAL_WRITE);
+	if (device->node_type != RDMA_NODE_RNIC) {
+		rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
+					IB_ACCESS_LOCAL_WRITE);
+	} else {
+		/* Why does it have to have these permissions? */
+		rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
+					IB_ACCESS_REMOTE_READ |
+					IB_ACCESS_REMOTE_WRITE |
+					IB_ACCESS_LOCAL_WRITE);
+	}
 	if (IS_ERR(rds_ibdev->mr))
 		goto err_pd;
 
-	rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
+	/* Create the MR pool. We choose different strategies for
+	 * MRs depending on the hardware.
+	 */
+	if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
+		/* Use fast registrations */
+		rds_ibdev->mr_pool = rds_ib_create_fastreg_pool(rds_ibdev);
+		rds_ibdev->use_fastreg = 1;
+	} else {
+		/* Default: use FMRs. Would be nice if there was
+		 * a capability flag to test for. */
+		rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
+	}
 	if (IS_ERR(rds_ibdev->mr_pool)) {
 		rds_ibdev->mr_pool = NULL;
 		goto err_mr;
Index: build-2.6/net/rds/ib.h
===================================================================
--- build-2.6.orig/net/rds/ib.h
+++ build-2.6/net/rds/ib.h
@@ -49,9 +49,51 @@ struct rds_ib_connect_private {
 	__be32			dp_credit;		/* non-zero enables flow ctl */
 };
 
+struct rds_ib_scatterlist {
+	struct scatterlist *	list;
+	unsigned int		len;
+	int			dma_len;
+};
+
+/* We need to post a LOCAL_INV request unless f_old_rkey
+ * has this value. */
+#define RDS_IB_INVALID_FASTREG_KEY 0
+
+struct rds_ib_fastreg {
+	atomic_t		f_refcnt;
+	unsigned int		f_posted : 1,
+				f_done : 1;
+
+	u32			f_old_rkey;
+
+	u32			f_rkey;
+	unsigned int		f_length;
+
+	struct rds_ib_scatterlist f_sg;
+
+	struct ib_fast_reg_page_list *f_page_list;
+	unsigned int		f_page_list_len;
+	unsigned int		f_page_shift;
+
+#if 0
+	u32			f_invalidate_rkey;
+	struct ib_send_wr	f_wr;
+	wait_queue_head_t	f_waitq;
+	struct list_head	f_list;
+	unsigned int		f_done;
+	int			f_status;
+#endif
+
+	struct rds_ib_mr	*f_mr;
+};
+
 struct rds_ib_send_work {
 	struct rds_message	*s_rm;
+
+	/* We should really put these into a union: */
 	struct rds_rdma_op	*s_op;
+	struct rds_ib_fastreg	*s_fastreg;
+
 	struct ib_send_wr	s_wr;
 	struct ib_sge		s_sge[RDS_IB_MAX_SGE];
 	unsigned long		s_queued;
@@ -86,6 +128,7 @@ struct rds_ib_connection {
 	struct rds_header	*i_send_hdrs;
 	u64			i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
+	struct list_head	i_fastreg_pending;
 
 	/* rx */
 	struct mutex		i_recv_mutex;
@@ -123,7 +166,9 @@ struct rds_ib_connection {
 	atomic_t		i_credits;
 
   	/* Protocol version specific information */
-	unsigned int		i_flowctl : 1;	/* enable/disable flow ctl */
+	unsigned int		i_flowctl : 1,	/* enable/disable flow ctl */
+				i_iwarp   : 1,	/* this is actually iWARP not IB */
+				i_fastreg : 1;	/* use fastreg */
 
 	/* Batched completions */
 	unsigned int		i_unsignaled_wrs;
@@ -154,6 +199,7 @@ struct rds_ib_device {
 	unsigned int		fmr_max_remaps;
 	unsigned int		max_fmrs;
 	int			max_sge;
+	unsigned int		use_fastreg : 1;
 	spinlock_t		spinlock;
 };
 
@@ -236,6 +282,7 @@ extern void rds_ib_remove_one(struct ib_
 extern struct ib_client rds_ib_client;
 
 extern unsigned int fmr_pool_size;
+extern unsigned int fastreg_pool_size;
 extern unsigned int fmr_message_size;
 
 /* ib_cm.c */
@@ -254,6 +301,7 @@ void __rds_ib_conn_error(struct rds_conn
 /* ib_rdma.c */
 int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
 struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
+struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *);
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
@@ -261,6 +309,10 @@ void *rds_ib_get_mr(struct scatterlist *
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
 
 /* ib_recv.c */
 int __init rds_ib_recv_init(void);
@@ -298,6 +350,7 @@ void rds_ib_send_cq_comp_handler(struct 
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
 void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
Index: build-2.6/net/rds/ib_rdma.c
===================================================================
--- build-2.6.orig/net/rds/ib_rdma.c
+++ build-2.6/net/rds/ib_rdma.c
@@ -45,20 +45,31 @@ extern struct list_head rds_ib_devices;
 struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
-	struct ib_fmr		*fmr;
+
+	spinlock_t		lock;
+	union {
+	    struct {
+		struct ib_fmr	*fmr;
+	    } ib;
+	    struct {
+		struct ib_fast_reg_page_list *page_list;
+		struct ib_mr	*fastreg_mr;
+		u32		rkey;
+		struct rds_ib_fastreg *pending;
+	    } iwarp;
+	} u;
 	struct list_head	list;
 	unsigned int		remap_count;
 
-	struct scatterlist *	sg;
-	unsigned int		sg_len;
-	u64 *			dma;
-	int			sg_dma_len;
+	struct rds_ib_scatterlist sg;
 };
 
 /*
  * Our own little FMR pool
  */
 struct rds_ib_mr_pool {
+	struct rds_ib_device *	device;
+
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct work_struct	flush_worker;		/* flush worker */
 
@@ -68,16 +79,57 @@ struct rds_ib_mr_pool {
 	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
 	struct list_head	free_list;		/* unused MRs */
 	struct list_head	clean_list;		/* unused & unamapped MRs */
+	struct list_head	fastreg_list;		/* pending fastreg's */
 	atomic_t		free_pinned;		/* memory pinned by free MRs */
+	unsigned long		max_message_size;	/* in pages */
 	unsigned long		max_items;
 	unsigned long		max_items_soft;
 	unsigned long		max_free_pinned;
 	struct ib_fmr_attr	fmr_attr;
+
+	/* Dummy QP used to handle invalidate for fastreg */
+	struct ib_qp		*qp;
+
+	struct rds_ib_mr_pool_ops *op;
+};
+
+struct rds_ib_mr_pool_ops {
+	int			(*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+	int			(*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+					struct scatterlist *sg, unsigned int sg_len);
+	void			(*unmap)(struct rds_ib_mr_pool *, struct list_head *);
+	void			(*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
 };
 
 static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+
+static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
+	.init		= rds_ib_init_fmr,
+	.map		= rds_ib_map_fmr,
+	.unmap		= rds_ib_unmap_fmr_list,
+	.destroy	= rds_ib_destroy_fmr,
+};
+
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+	.init		= rds_ib_init_fastreg,
+	.map		= rds_ib_map_fastreg,
+	.unmap		= rds_ib_unmap_fastreg_list,
+	.destroy	= rds_ib_destroy_fastreg,
+};
 
 int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
 {
@@ -124,7 +176,158 @@ struct rds_ib_device* ib_get_device(__be
 	return NULL;
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
+		struct scatterlist *list,
+		unsigned int sg_len, unsigned int sg_dma_len)
+{
+	sg->list = list;
+	sg->len = sg_len;
+	sg->dma_len = sg_dma_len;
+}
+
+static void rds_ib_rdma_drop_scatterlist(struct rds_ib_device *rds_ibdev,
+		struct rds_ib_scatterlist *sg)
+{
+	if (sg->dma_len) {
+		ib_dma_unmap_sg(rds_ibdev->dev,
+				sg->list, sg->len,
+				DMA_BIDIRECTIONAL);
+		sg->dma_len = 0;
+	}
+
+	/* Release the s/g list */
+	if (sg->len) {
+		unsigned int i;
+
+		for (i = 0; i < sg->len; ++i) {
+			struct page *page = sg_page(&sg->list[i]);
+
+			/* FIXME we need a way to tell a r/w MR
+			 * from a r/o MR */
+			set_page_dirty(page);
+			put_page(page);
+		}
+		kfree(sg->list);
+
+		sg->list = NULL;
+		sg->len = 0;
+	}
+}
+
+/*
+ * IB FMR handling
+ */
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_fmr *fmr;
+
+	fmr = ib_alloc_fmr(rds_ibdev->pd,
+			(IB_ACCESS_LOCAL_WRITE |
+			 IB_ACCESS_REMOTE_READ |
+			 IB_ACCESS_REMOTE_WRITE),
+			&pool->fmr_attr);
+	if (IS_ERR(fmr)) {
+		int err = PTR_ERR(fmr);
+
+		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+		return err;
+	}
+
+	ibmr->u.ib.fmr = fmr;
+	return 0;
+}
+
+static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+				struct list_head *unmap_list)
+{
+	struct rds_ib_mr *ibmr;
+	LIST_HEAD(fmr_list);
+	int ret;
+
+	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
+	list_for_each_entry(ibmr, unmap_list, list)
+		list_add(&ibmr->u.ib.fmr->list, &fmr_list);
+	ret = ib_unmap_fmr(&fmr_list);
+	if (ret)
+		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+}
+
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.ib.fmr)
+		ib_dealloc_fmr(ibmr->u.ib.fmr);
+	ibmr->u.ib.fmr = NULL;
+}
+
+/*
+ * iWARP fastreg handling
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+				struct rds_ib_mr *ibmr)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_mr *mr;
+
+	mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+	if (IS_ERR(mr)) {
+		int err = PTR_ERR(mr);
+
+		printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+		return err;
+	}
+
+	ibmr->u.iwarp.rkey = RDS_IB_INVALID_FASTREG_KEY;
+	ibmr->u.iwarp.fastreg_mr = mr;
+	return 0;
+}
+
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+				struct list_head *unmap_list)
+{
+	LIST_HEAD(fmr_list);
+
+	/* Batched invalidation of fastreg MRs.
+	 * Why do we do it this way, even though we could pipeline unmap
+	 * and remap? The reason is the application semantics - when the
+	 * application requests an invalidation of MRs, it expects all
+	 * previously released R_Keys to become invalid.
+	 *
+	 * If we implement MR reuse naively, we risk memory corruption
+	 * (this has actually been observed). So the default behavior
+	 * requires that a MR goes through an explicit unmap operation before
+	 * we can reuse it again.
+	 *
+	 * We could probably improve on this a little, by allowing immediate
+	 * reuse of a MR on the same socket (eg you could add small
+	 * cache of unused MRs to strct rds_socket - GET_MR could grab one
+	 * of these without requiring an explicit invalidate).
+	 */
+
+	/* Fill in the blanks:
+	    Go through the list of dirty MRs, and post LOCAL_INV WRs to the
+	    dummy pool->qp. When the completion for the last WR arrives,
+	    the CQ handler wakes up the caller.
+	  */
+	BUG(); /* not implemented yet. */
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+		struct rds_ib_mr *ibmr)
+{
+	if (ibmr->u.iwarp.page_list)
+		ib_free_fast_reg_page_list(ibmr->u.iwarp.page_list);
+	if (ibmr->u.iwarp.fastreg_mr)
+		ib_dereg_mr(ibmr->u.iwarp.fastreg_mr);
+	if (ibmr->u.iwarp.pending)
+		rds_ib_fastreg_release(ibmr->u.iwarp.pending);
+}
+
+struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+		unsigned int message_size, unsigned int pool_size,
+		struct rds_ib_mr_pool_ops *ops)
 {
 	struct rds_ib_mr_pool *pool;
 
@@ -132,25 +335,68 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
+	pool->device = rds_ibdev;
 	INIT_LIST_HEAD(&pool->free_list);
 	INIT_LIST_HEAD(&pool->drop_list);
 	INIT_LIST_HEAD(&pool->clean_list);
+	INIT_LIST_HEAD(&pool->fastreg_list);
 	mutex_init(&pool->flush_lock);
 	spin_lock_init(&pool->list_lock);
 	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
-	pool->fmr_attr.max_pages = fmr_message_size;
-	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
-	pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
-	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
+	pool->max_message_size = message_size;
+	pool->max_items = pool_size;
+	pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
 
 	/* We never allow more than max_items MRs to be allocated.
 	 * When we exceed more than max_items_soft, we start freeing
 	 * items more aggressively.
 	 * Make sure that max_items > max_items_soft > max_items / 2
 	 */
-	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
+	pool->max_items_soft = pool->max_items * 3 / 4;
+
+	return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+	struct rds_ib_mr_pool *pool;
+	unsigned int pool_size = fmr_pool_size;
+
+	if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+		pool_size = rds_ibdev->max_fmrs;
+
+	pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+				&rds_ib_fmr_pool_ops);
+
+	if (!IS_ERR(pool)) {
+		pool->fmr_attr.max_pages = pool->max_message_size;
+		pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+		pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+	}
+
+	return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *rds_ibdev)
+{
+	struct rds_ib_mr_pool *pool;
+	unsigned int pool_size = fmr_pool_size;
+
+	if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+		pool_size = rds_ibdev->max_fmrs;
+
+	pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size,
+				fastreg_pool_size,
+				&rds_ib_fastreg_pool_ops);
+
+	if (!IS_ERR(pool)) {
+		/* Fill in the blanks:
+		 *  create a dummy QP to which we can post LOCAL_INV
+		 *  requests when invalidating MRs
+		 */
+		pool->qp = NULL;
+	}
 
 	return pool;
 }
@@ -169,6 +415,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
 	rds_ib_flush_mr_pool(pool, 1);
 	BUG_ON(atomic_read(&pool->item_count));
 	BUG_ON(atomic_read(&pool->free_pinned));
+
+	if (pool->qp)
+		ib_destroy_qp(pool->qp);
+
 	kfree(pool);
 }
 
@@ -227,77 +477,82 @@ static struct rds_ib_mr *rds_ib_alloc_fm
 		goto out_no_cigar;
 	}
 
-	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-			(IB_ACCESS_LOCAL_WRITE |
-			 IB_ACCESS_REMOTE_READ |
-			 IB_ACCESS_REMOTE_WRITE),
-			&pool->fmr_attr);
-	if (IS_ERR(ibmr->fmr)) {
-		err = PTR_ERR(ibmr->fmr);
-		ibmr->fmr = NULL;
-		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+	spin_lock_init(&ibmr->lock);
+
+	err = pool->op->init(pool, ibmr);
+	if (err)
 		goto out_no_cigar;
-	}
 
 	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
 	return ibmr;
 
 out_no_cigar:
 	if (ibmr) {
-		if (ibmr->fmr)
-			ib_dealloc_fmr(ibmr->fmr);
+		pool->op->destroy(pool, ibmr);
 		kfree(ibmr);
 	}
 	atomic_dec(&pool->item_count);
 	return ERR_PTR(err);
 }
 
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
-	       struct scatterlist *sg, unsigned int nents)
+static int rds_ib_count_dma_pages(struct rds_ib_device *rds_ibdev,
+			  struct scatterlist *sg, unsigned int sg_dma_len,
+			  unsigned int *lenp)
 {
 	struct ib_device *dev = rds_ibdev->dev;
-	struct scatterlist *scat = sg;
-	u64 io_addr = 0;
-	u64 *dma_pages;
-	u32 len;
-	int page_cnt, sg_dma_len;
-	int i, j;
-	int ret;
-
-	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
-				 DMA_BIDIRECTIONAL);
-	if (unlikely(!sg_dma_len)) {
-	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
-		return -EBUSY;
-	}
-
-	len = 0;
-	page_cnt = 0;
+	unsigned int i, page_cnt = 0, len = 0;
 
 	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
 	
 		if (dma_addr & ~rds_ibdev->fmr_page_mask) {
 			if (i > 0)
 				return -EINVAL;
-			else
-				++page_cnt;
+			++page_cnt;
 		}
 		if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
 			if (i < sg_dma_len - 1)
 				return -EINVAL;
-			else
-				++page_cnt;
+			++page_cnt;
 		}
 
 		len += dma_len;
 	}
 
 	page_cnt += len >> rds_ibdev->fmr_page_shift;
-	if (page_cnt > fmr_message_size)
+	if (page_cnt > rds_ibdev->mr_pool->max_message_size)
 		return -EINVAL;
 
+	return page_cnt;
+}
+
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+			  struct rds_ib_mr *ibmr,
+	       		  struct scatterlist *sg, unsigned int nents)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_device *dev = rds_ibdev->dev;
+	struct scatterlist *scat = sg;
+	u64 io_addr = 0;
+	u64 *dma_pages;
+	int page_cnt, sg_dma_len;
+	int i, j;
+	int ret;
+
+	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
+				 DMA_BIDIRECTIONAL);
+	if (unlikely(!sg_dma_len)) {
+	        printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
+		return -EBUSY;
+	}
+
+	/* FIXME: when returning an error, we need to unmap the SG */
+
+	page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, NULL);
+	if (page_cnt < 0)
+		return page_cnt;
+
 	dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
 	if (!dma_pages)
 		return -ENOMEM;
@@ -312,7 +567,7 @@ static int rds_ib_map_fmr(struct rds_ib_
 				(dma_addr & rds_ibdev->fmr_page_mask) + j;
 	}
 				
-	ret = ib_map_phys_fmr(ibmr->fmr,
+	ret = ib_map_phys_fmr(ibmr->u.ib.fmr,
 				   dma_pages, page_cnt, io_addr);	
 	if (ret)
 		goto out;
@@ -321,9 +576,9 @@ static int rds_ib_map_fmr(struct rds_ib_
 	 * safely tear down the old mapping. */
 	rds_ib_teardown_mr(ibmr);
 
-	ibmr->sg = scat;
-	ibmr->sg_len = nents;
-	ibmr->sg_dma_len = sg_dma_len;
+	ibmr->sg.list = scat;
+	ibmr->sg.len = nents;
+	ibmr->sg.dma_len = sg_dma_len;
 	ibmr->remap_count++;
 
 	rds_ib_stats_inc(s_ib_rdma_mr_used);
@@ -335,6 +590,192 @@ out:
 	return ret;
 }
 
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+			struct rds_ib_mr *ibmr,
+	       		struct scatterlist *sg, unsigned int sg_len)
+{
+	struct rds_ib_device *rds_ibdev = pool->device;
+	struct ib_device *dev = rds_ibdev->dev;
+	struct ib_fast_reg_page_list *page_list = NULL;
+	struct rds_ib_fastreg *frr;
+	unsigned int len;
+	int i, j, page_cnt, sg_dma_len = 0;
+	int ret;
+
+	BUG_ON(ibmr->u.iwarp.pending);
+
+	page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
+	if (IS_ERR(page_list)) {
+		ret = PTR_ERR(page_list);
+		page_list = NULL;
+
+		printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
+		return ret;
+	}
+
+	sg_dma_len = ib_dma_map_sg(dev, sg, sg_len, DMA_BIDIRECTIONAL);
+	if (unlikely(!sg_dma_len)) {
+	        printk(KERN_WARNING "RDS/iWARP: dma_map_sg failed!\n");
+		ret = -EBUSY;
+		goto out;
+	}
+
+	page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, &len);
+	if (page_cnt < 0) {
+		ret = page_cnt;
+		goto out;
+	}
+
+	page_cnt = 0;
+	for (i = 0; i < sg_dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
+	
+		for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
+			page_list->page_list[page_cnt++] =
+				(dma_addr & rds_ibdev->fmr_page_mask) + j;
+	}
+
+	/* Allocate the fastreg request structure */
+	frr = kzalloc(sizeof(*frr), GFP_KERNEL);
+	if (!frr) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	ib_update_fast_reg_key(ibmr->u.iwarp.fastreg_mr, ibmr->remap_count++);
+
+	/* Build the fastreg WR */
+	frr->f_mr = ibmr;
+	rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len, sg_dma_len);
+	frr->f_length = len;
+	frr->f_rkey = ibmr->u.iwarp.fastreg_mr->rkey;
+	frr->f_page_list = page_list;
+	frr->f_page_list_len = sg_dma_len;
+	frr->f_page_shift = rds_ibdev->fmr_page_shift;
+
+	frr->f_old_rkey = ibmr->u.iwarp.rkey;
+
+	/* Attach the fastreg info to the MR */
+	atomic_set(&frr->f_refcnt, 1);
+	ibmr->u.iwarp.pending = frr;
+
+	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	ret = 0;
+
+out:
+	if (ret) {
+		ib_free_fast_reg_page_list(page_list);
+		if (sg_dma_len)
+			ib_dma_unmap_sg(dev, sg, sg_dma_len, DMA_BIDIRECTIONAL);
+	}
+
+	return ret;
+}
+
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
+{
+	struct rds_ib_mr *ibmr = mr->r_trans_private;
+	struct rds_ib_fastreg *frr;
+	unsigned long flags;
+
+	spin_lock_irqsave(&ibmr->lock, flags);
+	frr = ibmr->u.iwarp.pending;
+	if (frr) {
+		/* FIXME: we need to mark the frr as "locked"
+		 * to prevent FREE_MR from trashing the MR
+		 * as long as the fastreg is on the queue */
+		atomic_inc(&frr->f_refcnt);
+	}
+	spin_unlock_irqrestore(&ibmr->lock, flags);
+
+	return frr;
+}
+
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
+{
+	struct rds_ib_device *rds_ibdev = NULL;
+
+	if (atomic_dec_and_test(&frr->f_refcnt)) {
+		ib_free_fast_reg_page_list(frr->f_page_list);
+		BUG(); /* FIXME: obtain rds_ibdev */
+		rds_ib_rdma_drop_scatterlist(rds_ibdev, &frr->f_sg);
+		kfree(frr);
+	}
+}
+
+/*
+ * These functions are called back from the send CQ handler
+ * when the LOCAL_INV or FAST_REG_MR WRs complete.
+ */
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
+{
+	struct rds_ib_mr *ibmr = frr->f_mr;
+
+	spin_lock(&ibmr->lock);
+	if (ibmr->u.iwarp.pending != frr)
+		goto out_unlock;
+
+	if (status != IB_WC_SUCCESS) {
+		/* Yikes. Invalidation failed. What can we do but complain? */
+		printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
+		goto out_unlock;
+	}
+
+	if (frr->f_old_rkey == ibmr->u.iwarp.rkey) {
+		ibmr->u.iwarp.rkey = 0;
+		/* Now we can unpin any memory pinned for this MR. */
+		rds_ib_teardown_mr(ibmr);
+	}
+	frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
+
+out_unlock:
+	spin_unlock(&ibmr->lock);
+
+	/* The WR owned a reference to this frr. Drop it */
+	rds_ib_fastreg_release(frr);
+}
+
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
+{
+	struct rds_ib_mr *ibmr = frr->f_mr;
+
+	spin_lock(&ibmr->lock);
+
+	/* Technically, this would be a bug */
+	if (ibmr->u.iwarp.pending != frr)
+		goto out_unlock;
+
+	if (status != IB_WC_SUCCESS) {
+		/* Yikes. We were unable to register the application's
+		 * memory. We have no way of notifying the application.
+		 * We could probably tear down the QP and cry uncle, but
+		 * the SEND may already have gone out.
+		 * The only solace is that the RDMA initiated by the remote
+		 * will fail, because the key isn't valid.
+		 */
+		if (printk_ratelimit())
+			printk(KERN_NOTICE "RDS/iWARP: Unable to "
+					"perform fast memory registration.\n");
+		goto out_unlock;
+	}
+
+	ibmr->sg = frr->f_sg;
+	ibmr->u.iwarp.page_list = frr->f_page_list;
+	ibmr->u.iwarp.rkey = frr->f_rkey;
+
+	/* Detach frr from MR. We still have at least one ref after this */
+	ibmr->u.iwarp.pending = NULL;
+	rds_ib_fastreg_release(frr);
+	frr->f_done = 1;
+
+out_unlock:
+	spin_unlock(&ibmr->lock);
+
+	/* The WR owned a reference to this frr. Drop it */
+	rds_ib_fastreg_release(frr);
+}
+
 void rds_ib_sync_mr(void *trans_private, int direction)
 {
 	struct rds_ib_mr *ibmr = trans_private;
@@ -342,49 +783,24 @@ void rds_ib_sync_mr(void *trans_private,
 
 	switch (direction) {
 	case DMA_FROM_DEVICE:
-		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg.list,
+			ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	case DMA_TO_DEVICE:
-		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
-			ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+		ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg.list,
+			ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
 		break;
 	}
 }
 
 static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
-	struct rds_ib_device *rds_ibdev = ibmr->device;
-
-	if (ibmr->sg_dma_len) {
-		ib_dma_unmap_sg(rds_ibdev->dev,
-				ibmr->sg, ibmr->sg_len,
-				DMA_BIDIRECTIONAL);
-		ibmr->sg_dma_len = 0;
-	}
-
-	/* Release the s/g list */
-	if (ibmr->sg_len) {
-		unsigned int i;
-
-		for (i = 0; i < ibmr->sg_len; ++i) {
-			struct page *page = sg_page(&ibmr->sg[i]);
-
-			/* FIXME we need a way to tell a r/w MR
-			 * from a r/o MR */
-			set_page_dirty(page);
-			put_page(page);
-		}
-		kfree(ibmr->sg);
-
-		ibmr->sg = NULL;
-		ibmr->sg_len = 0;
-	}
+	rds_ib_rdma_drop_scatterlist(ibmr->device, &ibmr->sg);
 }
 
 void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
-	unsigned int pinned = ibmr->sg_len;
+	unsigned int pinned = ibmr->sg.len;
 
 	__rds_ib_teardown_mr(ibmr);
 	if (pinned) {
@@ -419,7 +835,6 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
 {
 	struct rds_ib_mr *ibmr, *next;
 	LIST_HEAD(unmap_list);
-	LIST_HEAD(fmr_list);
 	unsigned long unpinned = 0;
 	unsigned long flags;
 	unsigned int nfreed = 0, ncleaned = 0, free_goal;
@@ -443,21 +858,17 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
 	if (list_empty(&unmap_list))
 		goto out;
 
-	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-	list_for_each_entry(ibmr, &unmap_list, list)
-		list_add(&ibmr->fmr->list, &fmr_list);
-	ret = ib_unmap_fmr(&fmr_list);
-	if (ret)
-		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+	/* Batched invalidate of dirty MRs: */
+	pool->op->unmap(pool, &unmap_list);
 
 	/* Now we can destroy the DMA mapping and unpin any pages */
 	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
-		unpinned += ibmr->sg_len;
+		unpinned += ibmr->sg.len;
 		__rds_ib_teardown_mr(ibmr);
 		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
 			rds_ib_stats_inc(s_ib_rdma_mr_free);
 			list_del(&ibmr->list);
-			ib_dealloc_fmr(ibmr->fmr);
+			pool->op->destroy(pool, ibmr);
 			kfree(ibmr);
 			nfreed++;
 		}
@@ -491,7 +902,7 @@ void rds_ib_free_mr(void *trans_private,
 	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
 	unsigned long flags;
 
-	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
+	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg.len);
 
 	/* Return it to the pool's free list */
 	spin_lock_irqsave(&pool->list_lock, flags);
@@ -500,7 +911,7 @@ void rds_ib_free_mr(void *trans_private,
 	} else {
 		list_add(&ibmr->list, &pool->free_list);
 	}
-	atomic_add(ibmr->sg_len, &pool->free_pinned);
+	atomic_add(ibmr->sg.len, &pool->free_pinned);
 	atomic_inc(&pool->dirty_count);
 	spin_unlock_irqrestore(&pool->list_lock, flags);
 
@@ -536,6 +947,7 @@ void *rds_ib_get_mr(struct scatterlist *
 		    __be32 ip_addr, u32 *key_ret)
 {
 	struct rds_ib_device *rds_ibdev;
+	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
 	int ret;
 
@@ -545,7 +957,7 @@ void *rds_ib_get_mr(struct scatterlist *
 		goto out;
 	}
 
-	if (!rds_ibdev->mr_pool) {
+	if (!(pool = rds_ibdev->mr_pool)) {
 		ret = -ENODEV;
 		goto out;
 	}
@@ -554,9 +966,9 @@ void *rds_ib_get_mr(struct scatterlist *
 	if (IS_ERR(ibmr))
 		return ibmr;
 
-	ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+	ret = pool->op->map(pool, ibmr, sg, nents);
 	if (ret == 0)
-		*key_ret = ibmr->fmr->rkey;
+		*key_ret = ibmr->u.ib.fmr->rkey;
 	else
 		printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
 
Index: build-2.6/net/rds/rdma.c
===================================================================
--- build-2.6.orig/net/rds/rdma.c
+++ build-2.6/net/rds/rdma.c
@@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr
 		mr->r_trans->free_mr(trans_private, mr->r_invalidate);
 }
 
-static void rds_mr_put(struct rds_mr *mr)
+void __rds_put_mr_final(struct rds_mr *mr)
 {
-	if (!atomic_dec_and_test(&mr->r_refcount))
-		return;
-
 	rds_destroy_mr(mr);
 	kfree(mr);
 }
@@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long u
 }
 
 static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
-				u64 *cookie_ret)
+				u64 *cookie_ret, struct rds_mr **mr_ret)
 {
 	struct rds_mr *mr = NULL, *found;
 	unsigned int nr_pages;
@@ -297,6 +294,10 @@ static int __rds_rdma_map(struct rds_soc
 
 	rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
 
+	if (mr_ret) {
+		atomic_inc(&mr->r_refcount);
+		*mr_ret = mr;
+	}
 	ret = 0;
 out:
 	if (pages)
@@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char
 			   sizeof(struct rds_get_mr_args)))
 		return -EFAULT;
 
-	return __rds_rdma_map(rs, &args, NULL);
+	return __rds_rdma_map(rs, &args, NULL, NULL);
 }
 
 /*
@@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *
 
 	if (mr) {
 		mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
-		rds_mr_put(mr);
+		rm->m_rdma_mr = mr;
 	}
 	return err;
 }
@@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *r
 	 || rm->m_rdma_cookie != 0)
 		return -EINVAL;
 
-	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
+	return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
 }
Index: build-2.6/net/rds/rds.h
===================================================================
--- build-2.6.orig/net/rds/rds.h
+++ build-2.6/net/rds/rds.h
@@ -30,6 +30,7 @@
  */
 #define RDS_IB_PORT	18635
 #define RDS_TCP_PORT	18636
+#define RDS_IWARP_PORT	18637
 
 #ifndef AF_RDS
 #define AF_RDS          28      /* Reliable Datagram Socket     */
@@ -60,6 +61,7 @@
 /* XXX crap, we need to worry about this conflicting too */
 #define SYSCTL_NET_RDS 9912
 #define SYSCTL_NET_RDS_IB 100
+#define SYSCTL_NET_RDS_IWARP 101
 
 #ifdef DEBUG
 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
@@ -282,6 +284,7 @@ struct rds_incoming {
 #define RDS_MSG_RETRANSMITTED	5
 #define RDS_MSG_MAPPED		6
 #define RDS_MSG_PAGEVEC		7
+#define RDS_MSG_FASTREG_POSTED	8
 
 struct rds_message {
 	atomic_t		m_refcount;
@@ -301,6 +304,7 @@ struct rds_message {
 	struct rds_sock		*m_rs;
 	struct rds_rdma_op	*m_rdma_op;
 	rds_rdma_cookie_t	m_rdma_cookie;
+	struct rds_mr		*m_rdma_mr;
 	unsigned int		m_nents;
 	unsigned int		m_count;
 	struct scatterlist	m_sg[0];
Index: build-2.6/net/rds/ib_cm.c
===================================================================
--- build-2.6.orig/net/rds/ib_cm.c
+++ build-2.6/net/rds/ib_cm.c
@@ -142,16 +142,19 @@ static void rds_ib_cm_fill_conn_param(st
 			struct rds_ib_connect_private *dp,
 			u32 protocol_version)
 {
+	struct rds_ib_connection *ic = conn->c_transport_data;
+
 	memset(conn_param, 0, sizeof(struct rdma_conn_param));
 	/* XXX tune these? */
 	conn_param->responder_resources = 1;
 	conn_param->initiator_depth = 1;
-	conn_param->retry_count = 7;
-	conn_param->rnr_retry_count = 7;
 
-	if (dp) {
-		struct rds_ib_connection *ic = conn->c_transport_data;
+	if (!ic->i_iwarp) {
+		conn_param->retry_count = 7;
+		conn_param->rnr_retry_count = 7;
+	}
 
+	if (dp) {
 		memset(dp, 0, sizeof(*dp));
 		dp->dp_saddr = conn->c_laddr;
 		dp->dp_daddr = conn->c_faddr;
@@ -288,7 +291,7 @@ static int rds_ib_setup_qp(struct rds_co
 	 */
 	ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
 	if (ret) {
-		rdsdebug("ib_req_notify_cq failed: %d\n", ret);
+		rdsdebug("rdma_create_qp failed: %d\n", ret);
 		goto out;
 	}
 
@@ -442,6 +445,12 @@ static int rds_ib_cm_handle_connect(stru
 	ic->i_cm_id = cm_id;
 	cm_id->context = conn;
 
+	rds_ibdev = ib_get_client_data(cm_id->device, &rds_ib_client);
+
+	/* Remember whether this is IB or iWARP */
+	ic->i_iwarp = (cm_id->device->node_type == RDMA_NODE_RNIC);
+	ic->i_fastreg = rds_ibdev->use_fastreg;
+
  	/* We got halfway through setting up the ib_connection, if we
  	 * fail now, we have to take the long route out of this mess. */
  	destroy = 0;
@@ -462,7 +471,6 @@ static int rds_ib_cm_handle_connect(stru
  	}
 
 	/* update ib_device with this local ipaddr */
-	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 	ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
 
  	return 0;
@@ -616,6 +624,17 @@ int rds_ib_conn_connect(struct rds_conne
 	src.sin_addr.s_addr = (__force u32)conn->c_laddr;
 	src.sin_port = (__force u16)htons(0);
 
+	/* First, bind to the local address and device. */
+	ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
+	if (ret) {
+		rdsdebug("rdma_bind_addr(%u.%u.%u.%u) failed: %d\n",
+				NIPQUAD(conn->c_laddr), ret);
+		goto out;
+	}
+
+	/* Now check the device type and set i_iwarp */
+	ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
+
 	dest.sin_family = AF_INET;
 	dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
 	dest.sin_port = (__force u16)htons(RDS_IB_PORT);
@@ -662,8 +681,9 @@ void rds_ib_conn_shutdown(struct rds_con
 				   " cm: %p err %d\n", ic->i_cm_id, err);
 		}
 
-		/* Always move the QP to error state */
-		if (ic->i_cm_id->qp) {
+		/* For IB, we have to move the QP to error state.
+		 * This is not needed for iWARP */
+		if (ic->i_cm_id->qp && !ic->i_iwarp) {
 			qp_attr.qp_state = IB_QPS_ERR;
 			err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
 			if (err) {
Index: build-2.6/net/rds/ib_send.c
===================================================================
--- build-2.6.orig/net/rds/ib_send.c
+++ build-2.6/net/rds/ib_send.c
@@ -165,6 +165,8 @@ void rds_ib_send_clear_ring(struct rds_i
 			rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
 		if (send->s_op)
 			rds_ib_send_unmap_rdma(ic, send->s_op);
+		if (send->s_fastreg)
+			rds_ib_fastreg_release(send->s_fastreg);
 	}
 }
 
@@ -195,7 +197,7 @@ void rds_ib_send_cq_comp_handler(struct 
 	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
-			 be32_to_cpu(wc.imm_data));
+			 be32_to_cpu(wc.ex.imm_data));
 		rds_ib_stats_inc(s_ib_tx_cq_event);
 
 		if (wc.wr_id == RDS_IB_ACK_WR_ID) {
@@ -223,6 +225,16 @@ void rds_ib_send_cq_comp_handler(struct 
 				/* Nothing to be done - the SG list will be unmapped
 				 * when the SEND completes. */
 				break;
+			case IB_WR_LOCAL_INV:
+				/* We invalidated an r_key. the caller may want to
+				 * learn about this. */
+				if (send->s_fastreg)
+					rds_ib_local_inv_complete(send->s_fastreg, wc.status);
+				break;
+			case IB_WR_FAST_REG_MR:
+				if (send->s_fastreg)
+					rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
+				break;
 			default:
 				if (printk_ratelimit())
 					printk(KERN_NOTICE
@@ -261,7 +273,7 @@ void rds_ib_send_cq_comp_handler(struct 
 			 * queue_delay_work will not do anything if the work
 			 * struct is already queued, so we need to cancel it first.
 			 */
-			cancel_delayed_work(&conn->c_send_w);
+			cancel_delayed_work(&conn->c_send_w); /* FIXME barf */
 			queue_delayed_work(rds_wq, &conn->c_send_w, 0);
 		}
 
@@ -490,6 +502,21 @@ int rds_ib_xmit(struct rds_connection *c
 	else
 		i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
 
+	/* Fastreg support */
+	if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+	 && ic->i_fastreg
+	 && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
+		ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
+		if (ret)
+			goto out;
+
+		/* We don't release the fastreg yet - we can only
+		 * do that when it has completed. If the connection
+		 * goes down, and we re-queue the message, we would
+		 * have to retry the registration. */
+		set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
+	}
+
 	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
 	if (work_alloc == 0) {
 		set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
@@ -849,6 +876,108 @@ out:
 	return ret;
 }
 
+static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
+				 struct rds_ib_fastreg *frr)
+{
+	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rds_ib_send_work *send = NULL;
+	struct rds_ib_send_work *first;
+	struct ib_send_wr *failed_wr;
+	u32 pos;
+	u32 work_alloc = 0;
+	int ret;
+	int num_wrs;
+
+	/*
+	 * Perform 2 WRs for the fast_reg_mr's and chain them together.  The
+	 * first WR is used to invalidate the old rkey, and the second WR is
+	 * used to define the new fast_reg_mr request.  Each individual page
+	 * in the sg list is added to the fast reg page list and placed
+	 * inside the fast_reg_mr WR.  The key used is a rolling 8bit
+	 * counter, which should guarantee uniqueness.
+	 */
+	num_wrs = 0;
+	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
+		num_wrs++;
+	if (frr->f_page_list)
+		num_wrs++;
+	if (!num_wrs)
+		return 0;
+
+	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
+	if (work_alloc != num_wrs) {
+		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_stats_inc(s_ib_tx_ring_full);
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	first = send = &ic->i_sends[pos];
+
+	if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
+		memset(send, 0, sizeof(*send));
+		send->s_wr.opcode = IB_WR_LOCAL_INV;
+		send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
+		send->s_fastreg = frr;
+		send->s_queued = jiffies;
+
+		/* Get the next WR */
+		pos = (pos + 1) % ic->i_send_ring.w_nr;
+		send = &ic->i_sends[pos];
+	}
+
+	if (frr->f_page_list) {
+		memset(send, 0, sizeof(*send));
+		send->s_wr.opcode = IB_WR_FAST_REG_MR;
+		send->s_wr.wr.fast_reg.length = frr->f_length;
+		send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
+		send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
+		send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
+		send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
+		send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+					IB_ACCESS_REMOTE_READ |
+					IB_ACCESS_REMOTE_WRITE;
+		send->s_fastreg = frr;
+		send->s_queued = jiffies;
+	}
+
+	atomic_add(num_wrs, &frr->f_refcnt);
+
+	/* Chain the two WRs together */
+	if (num_wrs == 2)
+		first->s_wr.next = &send->s_wr;
+
+	failed_wr = &first->s_wr;
+	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+
+	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+		 first, &first->s_wr, ret, failed_wr);
+	BUG_ON(failed_wr != &first->s_wr);
+	if (ret) {
+		printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+		       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+		while (num_wrs--)
+			rds_ib_fastreg_release(frr);
+		rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+		return ret;
+	}
+
+out:
+	return ret;
+}
+
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
+{
+	struct rds_ib_fastreg *frr;
+
+	frr = rds_ib_rdma_get_fastreg(mr);
+	if (!frr)
+		return 0;
+	if (IS_ERR(frr))
+		return PTR_ERR(frr);
+	return __rds_ib_xmit_fastreg(conn, frr);
+}
+
 void rds_ib_xmit_complete(struct rds_connection *conn)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
Index: build-2.6/net/rds/send.c
===================================================================
--- build-2.6.orig/net/rds/send.c
+++ build-2.6/net/rds/send.c
@@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
 	list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
 		set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 		set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
+		/* If we were in the process of performing a fastreg
+		 * memory registration when the connection went down,
+		 * we have to retry it. */
+		clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
 	}
 	list_splice_init(&conn->c_retrans, &conn->c_send_queue);
 	spin_unlock_irqrestore(&conn->c_lock, flags);
@@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
 		if (cmsg->cmsg_level != SOL_RDS)
 			continue;
 
+		/* As a side effect, RDMA_DEST and RDMA_MAP will set
+		 * rm->m_rdma_cookie and rm->m_rdma_mr.
+		 */
 		switch (cmsg->cmsg_type) {
 		case RDS_CMSG_RDMA_ARGS:
 			ret = rds_cmsg_rdma_args(rs, rm, cmsg);
Index: build-2.6/net/rds/message.c
===================================================================
--- build-2.6.orig/net/rds/message.c
+++ build-2.6/net/rds/message.c
@@ -71,6 +71,8 @@ static void rds_message_purge(struct rds
 
 	if (rm->m_rdma_op)
 		rds_rdma_free_op(rm->m_rdma_op);
+	if (rm->m_rdma_mr)
+		rds_mr_put(rm->m_rdma_mr);
 }
 
 void rds_message_inc_purge(struct rds_incoming *inc)
Index: build-2.6/net/rds/rdma.h
===================================================================
--- build-2.6.orig/net/rds/rdma.h
+++ build-2.6/net/rds/rdma.h
@@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *r
 void rds_rdma_free_op(struct rds_rdma_op *ro);
 void rds_rdma_send_complete(struct rds_message *rm, int);
 
+extern void __rds_put_mr_final(struct rds_mr *mr);
+static inline void rds_mr_put(struct rds_mr *mr)
+{
+	if (atomic_dec_and_test(&mr->r_refcount))
+		__rds_put_mr_final(mr);
+}
+
 #endif
Index: build-2.6/net/rds/ib_recv.c
===================================================================
--- build-2.6.orig/net/rds/ib_recv.c
+++ build-2.6/net/rds/ib_recv.c
@@ -796,7 +796,7 @@ void rds_ib_recv_cq_comp_handler(struct 
 	while (ib_poll_cq(cq, 1, &wc) > 0) {
 		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
 			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
-			 be32_to_cpu(wc.imm_data));
+			 be32_to_cpu(wc.ex.imm_data));
 		rds_ib_stats_inc(s_ib_rx_cq_event);
 
 		recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to