On Thursday 03 July 2008 23:34:12 Jon Mason wrote:
> This patch adds support for running RDS over iWARP adapters. It
Hi Jon,
I took your patch and tried to isolate the iWARP specific changes
in bcopy mode, and roll them into a smaller patch that doesn't duplicate
all the ib*.[hc] files.
I also tried to come to some working solution for RDMA - as you can
see from the deluge of messages I wrote on this :-) the approach you
chose has some problems.
Please take a look at the attached patch and let me know whether
(a) bcopy mode works, and (b) if the rdma approach may work with
iwarp nics.
Olaf
--
Olaf Kirch | --- o --- Nous sommes du soleil we love when we play
[EMAIL PROTECTED] | / | \ sol.dhoop.naytheet.ah kin.ir.samse.qurax
From: Olaf Kirch <[EMAIL PROTECTED]>
Subject: [PATCH RFC] RDS: Add iWARP Support
This is based on the work posted by Jon Mason. It extracts
the iWARP-specific changes that are needed to support bcopy
mode (I hope I caught all of them).
I also did some work on RDMA support. This is a lot harder,
because the interface and implementation were designed with
classic MRs in mind. However, I think the approach taken below
may result in a working approach (it's not working yet - I left
some blanks and BUG() asserts in there, because I wanted to get this
patch out as a RFC sooner rather than later).
Also, this is a pretty large patch - it needs to be broken down into
half a dozen or so smaller functional changes for better review.
Olaf
---
net/rds/ib.c | 30 ++
net/rds/ib.h | 55 ++++
net/rds/ib_cm.c | 36 ++-
net/rds/ib_rdma.c | 610 +++++++++++++++++++++++++++++++++++++++++++++---------
net/rds/ib_recv.c | 2
net/rds/ib_send.c | 133 +++++++++++
net/rds/message.c | 2
net/rds/rdma.c | 17 -
net/rds/rdma.h | 7
net/rds/rds.h | 4
net/rds/send.c | 7
11 files changed, 778 insertions(+), 125 deletions(-)
Index: build-2.6/net/rds/ib.c
===================================================================
--- build-2.6.orig/net/rds/ib.c
+++ build-2.6/net/rds/ib.c
@@ -42,6 +42,7 @@
unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned MRs */
+unsigned int fastreg_pool_size = RDS_FMR_POOL_SIZE;
module_param(fmr_pool_size, int, 0444);
MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
@@ -85,21 +86,38 @@ void rds_ib_add_one(struct ib_device *de
rds_ibdev->fmr_page_size = 1 << rds_ibdev->fmr_page_shift;
rds_ibdev->fmr_page_mask = ~((u64) rds_ibdev->fmr_page_size - 1);
rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
- rds_ibdev->max_fmrs = dev_attr->max_fmr?
- min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
- fmr_pool_size;
+ rds_ibdev->max_fmrs = dev_attr->max_fmr;
rds_ibdev->dev = device;
rds_ibdev->pd = ib_alloc_pd(device);
if (IS_ERR(rds_ibdev->pd))
goto free_dev;
- rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
- IB_ACCESS_LOCAL_WRITE);
+ if (device->node_type != RDMA_NODE_RNIC) {
+ rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
+ IB_ACCESS_LOCAL_WRITE);
+ } else {
+ /* Why does it have to have these permissions? */
+ rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE |
+ IB_ACCESS_LOCAL_WRITE);
+ }
if (IS_ERR(rds_ibdev->mr))
goto err_pd;
- rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
+ /* Create the MR pool. We choose different strategies for
+ * MRs depending on the hardware.
+ */
+ if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
+ /* Use fast registrations */
+ rds_ibdev->mr_pool = rds_ib_create_fastreg_pool(rds_ibdev);
+ rds_ibdev->use_fastreg = 1;
+ } else {
+ /* Default: use FMRs. Would be nice if there was
+ * a capability flag to test for. */
+ rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
+ }
if (IS_ERR(rds_ibdev->mr_pool)) {
rds_ibdev->mr_pool = NULL;
goto err_mr;
Index: build-2.6/net/rds/ib.h
===================================================================
--- build-2.6.orig/net/rds/ib.h
+++ build-2.6/net/rds/ib.h
@@ -49,9 +49,51 @@ struct rds_ib_connect_private {
__be32 dp_credit; /* non-zero enables flow ctl */
};
+struct rds_ib_scatterlist {
+ struct scatterlist * list;
+ unsigned int len;
+ int dma_len;
+};
+
+/* We need to post a LOCAL_INV request unless f_old_rkey
+ * has this value. */
+#define RDS_IB_INVALID_FASTREG_KEY 0
+
+struct rds_ib_fastreg {
+ atomic_t f_refcnt;
+ unsigned int f_posted : 1,
+ f_done : 1;
+
+ u32 f_old_rkey;
+
+ u32 f_rkey;
+ unsigned int f_length;
+
+ struct rds_ib_scatterlist f_sg;
+
+ struct ib_fast_reg_page_list *f_page_list;
+ unsigned int f_page_list_len;
+ unsigned int f_page_shift;
+
+#if 0
+ u32 f_invalidate_rkey;
+ struct ib_send_wr f_wr;
+ wait_queue_head_t f_waitq;
+ struct list_head f_list;
+ unsigned int f_done;
+ int f_status;
+#endif
+
+ struct rds_ib_mr *f_mr;
+};
+
struct rds_ib_send_work {
struct rds_message *s_rm;
+
+ /* We should really put these into a union: */
struct rds_rdma_op *s_op;
+ struct rds_ib_fastreg *s_fastreg;
+
struct ib_send_wr s_wr;
struct ib_sge s_sge[RDS_IB_MAX_SGE];
unsigned long s_queued;
@@ -86,6 +128,7 @@ struct rds_ib_connection {
struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends;
+ struct list_head i_fastreg_pending;
/* rx */
struct mutex i_recv_mutex;
@@ -123,7 +166,9 @@ struct rds_ib_connection {
atomic_t i_credits;
/* Protocol version specific information */
- unsigned int i_flowctl : 1; /* enable/disable flow ctl */
+ unsigned int i_flowctl : 1, /* enable/disable flow ctl */
+ i_iwarp : 1, /* this is actually iWARP not IB */
+ i_fastreg : 1; /* use fastreg */
/* Batched completions */
unsigned int i_unsignaled_wrs;
@@ -154,6 +199,7 @@ struct rds_ib_device {
unsigned int fmr_max_remaps;
unsigned int max_fmrs;
int max_sge;
+ unsigned int use_fastreg : 1;
spinlock_t spinlock;
};
@@ -236,6 +282,7 @@ extern void rds_ib_remove_one(struct ib_
extern struct ib_client rds_ib_client;
extern unsigned int fmr_pool_size;
+extern unsigned int fastreg_pool_size;
extern unsigned int fmr_message_size;
/* ib_cm.c */
@@ -254,6 +301,7 @@ void __rds_ib_conn_error(struct rds_conn
/* ib_rdma.c */
int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
+struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *);
void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_ib_connection *iinfo);
void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
@@ -261,6 +309,10 @@ void *rds_ib_get_mr(struct scatterlist *
void rds_ib_sync_mr(void *trans_private, int dir);
void rds_ib_free_mr(void *trans_private, int invalidate);
void rds_ib_flush_mrs(void);
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
/* ib_recv.c */
int __init rds_ib_recv_init(void);
@@ -298,6 +350,7 @@ void rds_ib_send_cq_comp_handler(struct
void rds_ib_send_init_ring(struct rds_ib_connection *ic);
void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
Index: build-2.6/net/rds/ib_rdma.c
===================================================================
--- build-2.6.orig/net/rds/ib_rdma.c
+++ build-2.6/net/rds/ib_rdma.c
@@ -45,20 +45,31 @@ extern struct list_head rds_ib_devices;
struct rds_ib_mr {
struct rds_ib_device *device;
struct rds_ib_mr_pool *pool;
- struct ib_fmr *fmr;
+
+ spinlock_t lock;
+ union {
+ struct {
+ struct ib_fmr *fmr;
+ } ib;
+ struct {
+ struct ib_fast_reg_page_list *page_list;
+ struct ib_mr *fastreg_mr;
+ u32 rkey;
+ struct rds_ib_fastreg *pending;
+ } iwarp;
+ } u;
struct list_head list;
unsigned int remap_count;
- struct scatterlist * sg;
- unsigned int sg_len;
- u64 * dma;
- int sg_dma_len;
+ struct rds_ib_scatterlist sg;
};
/*
* Our own little FMR pool
*/
struct rds_ib_mr_pool {
+ struct rds_ib_device * device;
+
struct mutex flush_lock; /* serialize fmr invalidate */
struct work_struct flush_worker; /* flush worker */
@@ -68,16 +79,57 @@ struct rds_ib_mr_pool {
struct list_head drop_list; /* MRs that have reached their max_maps limit */
struct list_head free_list; /* unused MRs */
struct list_head clean_list; /* unused & unamapped MRs */
+ struct list_head fastreg_list; /* pending fastreg's */
atomic_t free_pinned; /* memory pinned by free MRs */
+ unsigned long max_message_size; /* in pages */
unsigned long max_items;
unsigned long max_items_soft;
unsigned long max_free_pinned;
struct ib_fmr_attr fmr_attr;
+
+ /* Dummy QP used to handle invalidate for fastreg */
+ struct ib_qp *qp;
+
+ struct rds_ib_mr_pool_ops *op;
+};
+
+struct rds_ib_mr_pool_ops {
+ int (*init)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
+ int (*map)(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int sg_len);
+ void (*unmap)(struct rds_ib_mr_pool *, struct list_head *);
+ void (*destroy)(struct rds_ib_mr_pool *, struct rds_ib_mr *);
};
static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents);
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct list_head *unmap_list);
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr *ibmr);
+
+static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
+ .init = rds_ib_init_fmr,
+ .map = rds_ib_map_fmr,
+ .unmap = rds_ib_unmap_fmr_list,
+ .destroy = rds_ib_destroy_fmr,
+};
+
+static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
+ .init = rds_ib_init_fastreg,
+ .map = rds_ib_map_fastreg,
+ .unmap = rds_ib_unmap_fastreg_list,
+ .destroy = rds_ib_destroy_fastreg,
+};
int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
{
@@ -124,7 +176,158 @@ struct rds_ib_device* ib_get_device(__be
return NULL;
}
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
+ struct scatterlist *list,
+ unsigned int sg_len, unsigned int sg_dma_len)
+{
+ sg->list = list;
+ sg->len = sg_len;
+ sg->dma_len = sg_dma_len;
+}
+
+static void rds_ib_rdma_drop_scatterlist(struct rds_ib_device *rds_ibdev,
+ struct rds_ib_scatterlist *sg)
+{
+ if (sg->dma_len) {
+ ib_dma_unmap_sg(rds_ibdev->dev,
+ sg->list, sg->len,
+ DMA_BIDIRECTIONAL);
+ sg->dma_len = 0;
+ }
+
+ /* Release the s/g list */
+ if (sg->len) {
+ unsigned int i;
+
+ for (i = 0; i < sg->len; ++i) {
+ struct page *page = sg_page(&sg->list[i]);
+
+ /* FIXME we need a way to tell a r/w MR
+ * from a r/o MR */
+ set_page_dirty(page);
+ put_page(page);
+ }
+ kfree(sg->list);
+
+ sg->list = NULL;
+ sg->len = 0;
+ }
+}
+
+/*
+ * IB FMR handling
+ */
+static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_fmr *fmr;
+
+ fmr = ib_alloc_fmr(rds_ibdev->pd,
+ (IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE),
+ &pool->fmr_attr);
+ if (IS_ERR(fmr)) {
+ int err = PTR_ERR(fmr);
+
+ printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+ return err;
+ }
+
+ ibmr->u.ib.fmr = fmr;
+ return 0;
+}
+
+static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list)
+{
+ struct rds_ib_mr *ibmr;
+ LIST_HEAD(fmr_list);
+ int ret;
+
+ /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
+ list_for_each_entry(ibmr, unmap_list, list)
+ list_add(&ibmr->u.ib.fmr->list, &fmr_list);
+ ret = ib_unmap_fmr(&fmr_list);
+ if (ret)
+ printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+}
+
+static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ if (ibmr->u.ib.fmr)
+ ib_dealloc_fmr(ibmr->u.ib.fmr);
+ ibmr->u.ib.fmr = NULL;
+}
+
+/*
+ * iWARP fastreg handling
+ */
+static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_mr *mr;
+
+ mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
+ if (IS_ERR(mr)) {
+ int err = PTR_ERR(mr);
+
+ printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
+ return err;
+ }
+
+ ibmr->u.iwarp.rkey = RDS_IB_INVALID_FASTREG_KEY;
+ ibmr->u.iwarp.fastreg_mr = mr;
+ return 0;
+}
+
+static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
+ struct list_head *unmap_list)
+{
+ LIST_HEAD(fmr_list);
+
+ /* Batched invalidation of fastreg MRs.
+ * Why do we do it this way, even though we could pipeline unmap
+ * and remap? The reason is the application semantics - when the
+ * application requests an invalidation of MRs, it expects all
+ * previously released R_Keys to become invalid.
+ *
+ * If we implement MR reuse naively, we risk memory corruption
+ * (this has actually been observed). So the default behavior
+ * requires that a MR goes through an explicit unmap operation before
+ * we can reuse it again.
+ *
+ * We could probably improve on this a little, by allowing immediate
+ * reuse of a MR on the same socket (eg you could add small
+ * cache of unused MRs to strct rds_socket - GET_MR could grab one
+ * of these without requiring an explicit invalidate).
+ */
+
+ /* Fill in the blanks:
+ Go through the list of dirty MRs, and post LOCAL_INV WRs to the
+ dummy pool->qp. When the completion for the last WR arrives,
+ the CQ handler wakes up the caller.
+ */
+ BUG(); /* not implemented yet. */
+}
+
+static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr)
+{
+ if (ibmr->u.iwarp.page_list)
+ ib_free_fast_reg_page_list(ibmr->u.iwarp.page_list);
+ if (ibmr->u.iwarp.fastreg_mr)
+ ib_dereg_mr(ibmr->u.iwarp.fastreg_mr);
+ if (ibmr->u.iwarp.pending)
+ rds_ib_fastreg_release(ibmr->u.iwarp.pending);
+}
+
+struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+ unsigned int message_size, unsigned int pool_size,
+ struct rds_ib_mr_pool_ops *ops)
{
struct rds_ib_mr_pool *pool;
@@ -132,25 +335,68 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
if (!pool)
return ERR_PTR(-ENOMEM);
+ pool->device = rds_ibdev;
INIT_LIST_HEAD(&pool->free_list);
INIT_LIST_HEAD(&pool->drop_list);
INIT_LIST_HEAD(&pool->clean_list);
+ INIT_LIST_HEAD(&pool->fastreg_list);
mutex_init(&pool->flush_lock);
spin_lock_init(&pool->list_lock);
INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
- pool->fmr_attr.max_pages = fmr_message_size;
- pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
- pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
- pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
+ pool->max_message_size = message_size;
+ pool->max_items = pool_size;
+ pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
/* We never allow more than max_items MRs to be allocated.
* When we exceed more than max_items_soft, we start freeing
* items more aggressively.
* Make sure that max_items > max_items_soft > max_items / 2
*/
- pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
- pool->max_items = rds_ibdev->max_fmrs;
+ pool->max_items_soft = pool->max_items * 3 / 4;
+
+ return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+{
+ struct rds_ib_mr_pool *pool;
+ unsigned int pool_size = fmr_pool_size;
+
+ if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+ pool_size = rds_ibdev->max_fmrs;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
+ &rds_ib_fmr_pool_ops);
+
+ if (!IS_ERR(pool)) {
+ pool->fmr_attr.max_pages = pool->max_message_size;
+ pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+ pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
+ }
+
+ return pool;
+}
+
+struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *rds_ibdev)
+{
+ struct rds_ib_mr_pool *pool;
+ unsigned int pool_size = fmr_pool_size;
+
+ if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
+ pool_size = rds_ibdev->max_fmrs;
+
+ pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size,
+ fastreg_pool_size,
+ &rds_ib_fastreg_pool_ops);
+
+ if (!IS_ERR(pool)) {
+ /* Fill in the blanks:
+ * create a dummy QP to which we can post LOCAL_INV
+ * requests when invalidating MRs
+ */
+ pool->qp = NULL;
+ }
return pool;
}
@@ -169,6 +415,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
rds_ib_flush_mr_pool(pool, 1);
BUG_ON(atomic_read(&pool->item_count));
BUG_ON(atomic_read(&pool->free_pinned));
+
+ if (pool->qp)
+ ib_destroy_qp(pool->qp);
+
kfree(pool);
}
@@ -227,77 +477,82 @@ static struct rds_ib_mr *rds_ib_alloc_fm
goto out_no_cigar;
}
- ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
- (IB_ACCESS_LOCAL_WRITE |
- IB_ACCESS_REMOTE_READ |
- IB_ACCESS_REMOTE_WRITE),
- &pool->fmr_attr);
- if (IS_ERR(ibmr->fmr)) {
- err = PTR_ERR(ibmr->fmr);
- ibmr->fmr = NULL;
- printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
+ spin_lock_init(&ibmr->lock);
+
+ err = pool->op->init(pool, ibmr);
+ if (err)
goto out_no_cigar;
- }
rds_ib_stats_inc(s_ib_rdma_mr_alloc);
return ibmr;
out_no_cigar:
if (ibmr) {
- if (ibmr->fmr)
- ib_dealloc_fmr(ibmr->fmr);
+ pool->op->destroy(pool, ibmr);
kfree(ibmr);
}
atomic_dec(&pool->item_count);
return ERR_PTR(err);
}
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
- struct scatterlist *sg, unsigned int nents)
+static int rds_ib_count_dma_pages(struct rds_ib_device *rds_ibdev,
+ struct scatterlist *sg, unsigned int sg_dma_len,
+ unsigned int *lenp)
{
struct ib_device *dev = rds_ibdev->dev;
- struct scatterlist *scat = sg;
- u64 io_addr = 0;
- u64 *dma_pages;
- u32 len;
- int page_cnt, sg_dma_len;
- int i, j;
- int ret;
-
- sg_dma_len = ib_dma_map_sg(dev, sg, nents,
- DMA_BIDIRECTIONAL);
- if (unlikely(!sg_dma_len)) {
- printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
- return -EBUSY;
- }
-
- len = 0;
- page_cnt = 0;
+ unsigned int i, page_cnt = 0, len = 0;
for (i = 0; i < sg_dma_len; ++i) {
- unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
- u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+ unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
+ u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
if (dma_addr & ~rds_ibdev->fmr_page_mask) {
if (i > 0)
return -EINVAL;
- else
- ++page_cnt;
+ ++page_cnt;
}
if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
if (i < sg_dma_len - 1)
return -EINVAL;
- else
- ++page_cnt;
+ ++page_cnt;
}
len += dma_len;
}
page_cnt += len >> rds_ibdev->fmr_page_shift;
- if (page_cnt > fmr_message_size)
+ if (page_cnt > rds_ibdev->mr_pool->max_message_size)
return -EINVAL;
+ return page_cnt;
+}
+
+static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int nents)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_device *dev = rds_ibdev->dev;
+ struct scatterlist *scat = sg;
+ u64 io_addr = 0;
+ u64 *dma_pages;
+ int page_cnt, sg_dma_len;
+ int i, j;
+ int ret;
+
+ sg_dma_len = ib_dma_map_sg(dev, sg, nents,
+ DMA_BIDIRECTIONAL);
+ if (unlikely(!sg_dma_len)) {
+ printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
+ return -EBUSY;
+ }
+
+ /* FIXME: when returning an error, we need to unmap the SG */
+
+ page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, NULL);
+ if (page_cnt < 0)
+ return page_cnt;
+
dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
if (!dma_pages)
return -ENOMEM;
@@ -312,7 +567,7 @@ static int rds_ib_map_fmr(struct rds_ib_
(dma_addr & rds_ibdev->fmr_page_mask) + j;
}
- ret = ib_map_phys_fmr(ibmr->fmr,
+ ret = ib_map_phys_fmr(ibmr->u.ib.fmr,
dma_pages, page_cnt, io_addr);
if (ret)
goto out;
@@ -321,9 +576,9 @@ static int rds_ib_map_fmr(struct rds_ib_
* safely tear down the old mapping. */
rds_ib_teardown_mr(ibmr);
- ibmr->sg = scat;
- ibmr->sg_len = nents;
- ibmr->sg_dma_len = sg_dma_len;
+ ibmr->sg.list = scat;
+ ibmr->sg.len = nents;
+ ibmr->sg.dma_len = sg_dma_len;
ibmr->remap_count++;
rds_ib_stats_inc(s_ib_rdma_mr_used);
@@ -335,6 +590,192 @@ out:
return ret;
}
+static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
+ struct rds_ib_mr *ibmr,
+ struct scatterlist *sg, unsigned int sg_len)
+{
+ struct rds_ib_device *rds_ibdev = pool->device;
+ struct ib_device *dev = rds_ibdev->dev;
+ struct ib_fast_reg_page_list *page_list = NULL;
+ struct rds_ib_fastreg *frr;
+ unsigned int len;
+ int i, j, page_cnt, sg_dma_len = 0;
+ int ret;
+
+ BUG_ON(ibmr->u.iwarp.pending);
+
+ page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, pool->max_message_size);
+ if (IS_ERR(page_list)) {
+ ret = PTR_ERR(page_list);
+ page_list = NULL;
+
+ printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list failed (err=%d)\n", ret);
+ return ret;
+ }
+
+ sg_dma_len = ib_dma_map_sg(dev, sg, sg_len, DMA_BIDIRECTIONAL);
+ if (unlikely(!sg_dma_len)) {
+ printk(KERN_WARNING "RDS/iWARP: dma_map_sg failed!\n");
+ ret = -EBUSY;
+ goto out;
+ }
+
+ page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, &len);
+ if (page_cnt < 0) {
+ ret = page_cnt;
+ goto out;
+ }
+
+ page_cnt = 0;
+ for (i = 0; i < sg_dma_len; ++i) {
+ unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
+ u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
+
+ for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
+ page_list->page_list[page_cnt++] =
+ (dma_addr & rds_ibdev->fmr_page_mask) + j;
+ }
+
+ /* Allocate the fastreg request structure */
+ frr = kzalloc(sizeof(*frr), GFP_KERNEL);
+ if (!frr) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ ib_update_fast_reg_key(ibmr->u.iwarp.fastreg_mr, ibmr->remap_count++);
+
+ /* Build the fastreg WR */
+ frr->f_mr = ibmr;
+ rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len, sg_dma_len);
+ frr->f_length = len;
+ frr->f_rkey = ibmr->u.iwarp.fastreg_mr->rkey;
+ frr->f_page_list = page_list;
+ frr->f_page_list_len = sg_dma_len;
+ frr->f_page_shift = rds_ibdev->fmr_page_shift;
+
+ frr->f_old_rkey = ibmr->u.iwarp.rkey;
+
+ /* Attach the fastreg info to the MR */
+ atomic_set(&frr->f_refcnt, 1);
+ ibmr->u.iwarp.pending = frr;
+
+ rds_ib_stats_inc(s_ib_rdma_mr_used);
+ ret = 0;
+
+out:
+ if (ret) {
+ ib_free_fast_reg_page_list(page_list);
+ if (sg_dma_len)
+ ib_dma_unmap_sg(dev, sg, sg_dma_len, DMA_BIDIRECTIONAL);
+ }
+
+ return ret;
+}
+
+struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
+{
+ struct rds_ib_mr *ibmr = mr->r_trans_private;
+ struct rds_ib_fastreg *frr;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ibmr->lock, flags);
+ frr = ibmr->u.iwarp.pending;
+ if (frr) {
+ /* FIXME: we need to mark the frr as "locked"
+ * to prevent FREE_MR from trashing the MR
+ * as long as the fastreg is on the queue */
+ atomic_inc(&frr->f_refcnt);
+ }
+ spin_unlock_irqrestore(&ibmr->lock, flags);
+
+ return frr;
+}
+
+void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
+{
+ struct rds_ib_device *rds_ibdev = NULL;
+
+ if (atomic_dec_and_test(&frr->f_refcnt)) {
+ ib_free_fast_reg_page_list(frr->f_page_list);
+ BUG(); /* FIXME: obtain rds_ibdev */
+ rds_ib_rdma_drop_scatterlist(rds_ibdev, &frr->f_sg);
+ kfree(frr);
+ }
+}
+
+/*
+ * These functions are called back from the send CQ handler
+ * when the LOCAL_INV or FAST_REG_MR WRs complete.
+ */
+void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
+{
+ struct rds_ib_mr *ibmr = frr->f_mr;
+
+ spin_lock(&ibmr->lock);
+ if (ibmr->u.iwarp.pending != frr)
+ goto out_unlock;
+
+ if (status != IB_WC_SUCCESS) {
+ /* Yikes. Invalidation failed. What can we do but complain? */
+ printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg MR.\n");
+ goto out_unlock;
+ }
+
+ if (frr->f_old_rkey == ibmr->u.iwarp.rkey) {
+ ibmr->u.iwarp.rkey = 0;
+ /* Now we can unpin any memory pinned for this MR. */
+ rds_ib_teardown_mr(ibmr);
+ }
+ frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
+
+out_unlock:
+ spin_unlock(&ibmr->lock);
+
+ /* The WR owned a reference to this frr. Drop it */
+ rds_ib_fastreg_release(frr);
+}
+
+void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
+{
+ struct rds_ib_mr *ibmr = frr->f_mr;
+
+ spin_lock(&ibmr->lock);
+
+ /* Technically, this would be a bug */
+ if (ibmr->u.iwarp.pending != frr)
+ goto out_unlock;
+
+ if (status != IB_WC_SUCCESS) {
+ /* Yikes. We were unable to register the application's
+ * memory. We have no way of notifying the application.
+ * We could probably tear down the QP and cry uncle, but
+ * the SEND may already have gone out.
+ * The only solace is that the RDMA initiated by the remote
+ * will fail, because the key isn't valid.
+ */
+ if (printk_ratelimit())
+ printk(KERN_NOTICE "RDS/iWARP: Unable to "
+ "perform fast memory registration.\n");
+ goto out_unlock;
+ }
+
+ ibmr->sg = frr->f_sg;
+ ibmr->u.iwarp.page_list = frr->f_page_list;
+ ibmr->u.iwarp.rkey = frr->f_rkey;
+
+ /* Detach frr from MR. We still have at least one ref after this */
+ ibmr->u.iwarp.pending = NULL;
+ rds_ib_fastreg_release(frr);
+ frr->f_done = 1;
+
+out_unlock:
+ spin_unlock(&ibmr->lock);
+
+ /* The WR owned a reference to this frr. Drop it */
+ rds_ib_fastreg_release(frr);
+}
+
void rds_ib_sync_mr(void *trans_private, int direction)
{
struct rds_ib_mr *ibmr = trans_private;
@@ -342,49 +783,24 @@ void rds_ib_sync_mr(void *trans_private,
switch (direction) {
case DMA_FROM_DEVICE:
- ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
- ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+ ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg.list,
+ ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
break;
case DMA_TO_DEVICE:
- ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
- ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
+ ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg.list,
+ ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
break;
}
}
static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
{
- struct rds_ib_device *rds_ibdev = ibmr->device;
-
- if (ibmr->sg_dma_len) {
- ib_dma_unmap_sg(rds_ibdev->dev,
- ibmr->sg, ibmr->sg_len,
- DMA_BIDIRECTIONAL);
- ibmr->sg_dma_len = 0;
- }
-
- /* Release the s/g list */
- if (ibmr->sg_len) {
- unsigned int i;
-
- for (i = 0; i < ibmr->sg_len; ++i) {
- struct page *page = sg_page(&ibmr->sg[i]);
-
- /* FIXME we need a way to tell a r/w MR
- * from a r/o MR */
- set_page_dirty(page);
- put_page(page);
- }
- kfree(ibmr->sg);
-
- ibmr->sg = NULL;
- ibmr->sg_len = 0;
- }
+ rds_ib_rdma_drop_scatterlist(ibmr->device, &ibmr->sg);
}
void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
{
- unsigned int pinned = ibmr->sg_len;
+ unsigned int pinned = ibmr->sg.len;
__rds_ib_teardown_mr(ibmr);
if (pinned) {
@@ -419,7 +835,6 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
{
struct rds_ib_mr *ibmr, *next;
LIST_HEAD(unmap_list);
- LIST_HEAD(fmr_list);
unsigned long unpinned = 0;
unsigned long flags;
unsigned int nfreed = 0, ncleaned = 0, free_goal;
@@ -443,21 +858,17 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
if (list_empty(&unmap_list))
goto out;
- /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
- list_for_each_entry(ibmr, &unmap_list, list)
- list_add(&ibmr->fmr->list, &fmr_list);
- ret = ib_unmap_fmr(&fmr_list);
- if (ret)
- printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
+ /* Batched invalidate of dirty MRs: */
+ pool->op->unmap(pool, &unmap_list);
/* Now we can destroy the DMA mapping and unpin any pages */
list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
- unpinned += ibmr->sg_len;
+ unpinned += ibmr->sg.len;
__rds_ib_teardown_mr(ibmr);
if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
rds_ib_stats_inc(s_ib_rdma_mr_free);
list_del(&ibmr->list);
- ib_dealloc_fmr(ibmr->fmr);
+ pool->op->destroy(pool, ibmr);
kfree(ibmr);
nfreed++;
}
@@ -491,7 +902,7 @@ void rds_ib_free_mr(void *trans_private,
struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
unsigned long flags;
- rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
+ rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg.len);
/* Return it to the pool's free list */
spin_lock_irqsave(&pool->list_lock, flags);
@@ -500,7 +911,7 @@ void rds_ib_free_mr(void *trans_private,
} else {
list_add(&ibmr->list, &pool->free_list);
}
- atomic_add(ibmr->sg_len, &pool->free_pinned);
+ atomic_add(ibmr->sg.len, &pool->free_pinned);
atomic_inc(&pool->dirty_count);
spin_unlock_irqrestore(&pool->list_lock, flags);
@@ -536,6 +947,7 @@ void *rds_ib_get_mr(struct scatterlist *
__be32 ip_addr, u32 *key_ret)
{
struct rds_ib_device *rds_ibdev;
+ struct rds_ib_mr_pool *pool;
struct rds_ib_mr *ibmr = NULL;
int ret;
@@ -545,7 +957,7 @@ void *rds_ib_get_mr(struct scatterlist *
goto out;
}
- if (!rds_ibdev->mr_pool) {
+ if (!(pool = rds_ibdev->mr_pool)) {
ret = -ENODEV;
goto out;
}
@@ -554,9 +966,9 @@ void *rds_ib_get_mr(struct scatterlist *
if (IS_ERR(ibmr))
return ibmr;
- ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
+ ret = pool->op->map(pool, ibmr, sg, nents);
if (ret == 0)
- *key_ret = ibmr->fmr->rkey;
+ *key_ret = ibmr->u.ib.fmr->rkey;
else
printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
Index: build-2.6/net/rds/rdma.c
===================================================================
--- build-2.6.orig/net/rds/rdma.c
+++ build-2.6/net/rds/rdma.c
@@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr
mr->r_trans->free_mr(trans_private, mr->r_invalidate);
}
-static void rds_mr_put(struct rds_mr *mr)
+void __rds_put_mr_final(struct rds_mr *mr)
{
- if (!atomic_dec_and_test(&mr->r_refcount))
- return;
-
rds_destroy_mr(mr);
kfree(mr);
}
@@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long u
}
static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
- u64 *cookie_ret)
+ u64 *cookie_ret, struct rds_mr **mr_ret)
{
struct rds_mr *mr = NULL, *found;
unsigned int nr_pages;
@@ -297,6 +294,10 @@ static int __rds_rdma_map(struct rds_soc
rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
+ if (mr_ret) {
+ atomic_inc(&mr->r_refcount);
+ *mr_ret = mr;
+ }
ret = 0;
out:
if (pages)
@@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char
sizeof(struct rds_get_mr_args)))
return -EFAULT;
- return __rds_rdma_map(rs, &args, NULL);
+ return __rds_rdma_map(rs, &args, NULL, NULL);
}
/*
@@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *
if (mr) {
mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
- rds_mr_put(mr);
+ rm->m_rdma_mr = mr;
}
return err;
}
@@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *r
|| rm->m_rdma_cookie != 0)
return -EINVAL;
- return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
+ return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
}
Index: build-2.6/net/rds/rds.h
===================================================================
--- build-2.6.orig/net/rds/rds.h
+++ build-2.6/net/rds/rds.h
@@ -30,6 +30,7 @@
*/
#define RDS_IB_PORT 18635
#define RDS_TCP_PORT 18636
+#define RDS_IWARP_PORT 18637
#ifndef AF_RDS
#define AF_RDS 28 /* Reliable Datagram Socket */
@@ -60,6 +61,7 @@
/* XXX crap, we need to worry about this conflicting too */
#define SYSCTL_NET_RDS 9912
#define SYSCTL_NET_RDS_IB 100
+#define SYSCTL_NET_RDS_IWARP 101
#ifdef DEBUG
#define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
@@ -282,6 +284,7 @@ struct rds_incoming {
#define RDS_MSG_RETRANSMITTED 5
#define RDS_MSG_MAPPED 6
#define RDS_MSG_PAGEVEC 7
+#define RDS_MSG_FASTREG_POSTED 8
struct rds_message {
atomic_t m_refcount;
@@ -301,6 +304,7 @@ struct rds_message {
struct rds_sock *m_rs;
struct rds_rdma_op *m_rdma_op;
rds_rdma_cookie_t m_rdma_cookie;
+ struct rds_mr *m_rdma_mr;
unsigned int m_nents;
unsigned int m_count;
struct scatterlist m_sg[0];
Index: build-2.6/net/rds/ib_cm.c
===================================================================
--- build-2.6.orig/net/rds/ib_cm.c
+++ build-2.6/net/rds/ib_cm.c
@@ -142,16 +142,19 @@ static void rds_ib_cm_fill_conn_param(st
struct rds_ib_connect_private *dp,
u32 protocol_version)
{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
memset(conn_param, 0, sizeof(struct rdma_conn_param));
/* XXX tune these? */
conn_param->responder_resources = 1;
conn_param->initiator_depth = 1;
- conn_param->retry_count = 7;
- conn_param->rnr_retry_count = 7;
- if (dp) {
- struct rds_ib_connection *ic = conn->c_transport_data;
+ if (!ic->i_iwarp) {
+ conn_param->retry_count = 7;
+ conn_param->rnr_retry_count = 7;
+ }
+ if (dp) {
memset(dp, 0, sizeof(*dp));
dp->dp_saddr = conn->c_laddr;
dp->dp_daddr = conn->c_faddr;
@@ -288,7 +291,7 @@ static int rds_ib_setup_qp(struct rds_co
*/
ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
if (ret) {
- rdsdebug("ib_req_notify_cq failed: %d\n", ret);
+ rdsdebug("rdma_create_qp failed: %d\n", ret);
goto out;
}
@@ -442,6 +445,12 @@ static int rds_ib_cm_handle_connect(stru
ic->i_cm_id = cm_id;
cm_id->context = conn;
+ rds_ibdev = ib_get_client_data(cm_id->device, &rds_ib_client);
+
+ /* Remember whether this is IB or iWARP */
+ ic->i_iwarp = (cm_id->device->node_type == RDMA_NODE_RNIC);
+ ic->i_fastreg = rds_ibdev->use_fastreg;
+
/* We got halfway through setting up the ib_connection, if we
* fail now, we have to take the long route out of this mess. */
destroy = 0;
@@ -462,7 +471,6 @@ static int rds_ib_cm_handle_connect(stru
}
/* update ib_device with this local ipaddr */
- rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
return 0;
@@ -616,6 +624,17 @@ int rds_ib_conn_connect(struct rds_conne
src.sin_addr.s_addr = (__force u32)conn->c_laddr;
src.sin_port = (__force u16)htons(0);
+ /* First, bind to the local address and device. */
+ ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
+ if (ret) {
+ rdsdebug("rdma_bind_addr(%u.%u.%u.%u) failed: %d\n",
+ NIPQUAD(conn->c_laddr), ret);
+ goto out;
+ }
+
+ /* Now check the device type and set i_iwarp */
+ ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
+
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
dest.sin_port = (__force u16)htons(RDS_IB_PORT);
@@ -662,8 +681,9 @@ void rds_ib_conn_shutdown(struct rds_con
" cm: %p err %d\n", ic->i_cm_id, err);
}
- /* Always move the QP to error state */
- if (ic->i_cm_id->qp) {
+ /* For IB, we have to move the QP to error state.
+ * This is not needed for iWARP */
+ if (ic->i_cm_id->qp && !ic->i_iwarp) {
qp_attr.qp_state = IB_QPS_ERR;
err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
if (err) {
Index: build-2.6/net/rds/ib_send.c
===================================================================
--- build-2.6.orig/net/rds/ib_send.c
+++ build-2.6/net/rds/ib_send.c
@@ -165,6 +165,8 @@ void rds_ib_send_clear_ring(struct rds_i
rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
if (send->s_op)
rds_ib_send_unmap_rdma(ic, send->s_op);
+ if (send->s_fastreg)
+ rds_ib_fastreg_release(send->s_fastreg);
}
}
@@ -195,7 +197,7 @@ void rds_ib_send_cq_comp_handler(struct
while (ib_poll_cq(cq, 1, &wc) > 0 ) {
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status, wc.byte_len,
- be32_to_cpu(wc.imm_data));
+ be32_to_cpu(wc.ex.imm_data));
rds_ib_stats_inc(s_ib_tx_cq_event);
if (wc.wr_id == RDS_IB_ACK_WR_ID) {
@@ -223,6 +225,16 @@ void rds_ib_send_cq_comp_handler(struct
/* Nothing to be done - the SG list will be unmapped
* when the SEND completes. */
break;
+ case IB_WR_LOCAL_INV:
+ /* We invalidated an r_key. the caller may want to
+ * learn about this. */
+ if (send->s_fastreg)
+ rds_ib_local_inv_complete(send->s_fastreg, wc.status);
+ break;
+ case IB_WR_FAST_REG_MR:
+ if (send->s_fastreg)
+ rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
+ break;
default:
if (printk_ratelimit())
printk(KERN_NOTICE
@@ -261,7 +273,7 @@ void rds_ib_send_cq_comp_handler(struct
* queue_delay_work will not do anything if the work
* struct is already queued, so we need to cancel it first.
*/
- cancel_delayed_work(&conn->c_send_w);
+ cancel_delayed_work(&conn->c_send_w); /* FIXME barf */
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
}
@@ -490,6 +502,21 @@ int rds_ib_xmit(struct rds_connection *c
else
i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
+ /* Fastreg support */
+ if (rds_rdma_cookie_key(rm->m_rdma_cookie)
+ && ic->i_fastreg
+ && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
+ ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
+ if (ret)
+ goto out;
+
+ /* We don't release the fastreg yet - we can only
+ * do that when it has completed. If the connection
+ * goes down, and we re-queue the message, we would
+ * have to retry the registration. */
+ set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
+ }
+
work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
if (work_alloc == 0) {
set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
@@ -849,6 +876,108 @@ out:
return ret;
}
+static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
+ struct rds_ib_fastreg *frr)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_send_work *send = NULL;
+ struct rds_ib_send_work *first;
+ struct ib_send_wr *failed_wr;
+ u32 pos;
+ u32 work_alloc = 0;
+ int ret;
+ int num_wrs;
+
+ /*
+ * Perform 2 WRs for the fast_reg_mr's and chain them together. The
+ * first WR is used to invalidate the old rkey, and the second WR is
+ * used to define the new fast_reg_mr request. Each individual page
+ * in the sg list is added to the fast reg page list and placed
+ * inside the fast_reg_mr WR. The key used is a rolling 8bit
+ * counter, which should guarantee uniqueness.
+ */
+ num_wrs = 0;
+ if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
+ num_wrs++;
+ if (frr->f_page_list)
+ num_wrs++;
+ if (!num_wrs)
+ return 0;
+
+ work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
+ if (work_alloc != num_wrs) {
+ rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_stats_inc(s_ib_tx_ring_full);
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ first = send = &ic->i_sends[pos];
+
+ if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
+ memset(send, 0, sizeof(*send));
+ send->s_wr.opcode = IB_WR_LOCAL_INV;
+ send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
+ send->s_fastreg = frr;
+ send->s_queued = jiffies;
+
+ /* Get the next WR */
+ pos = (pos + 1) % ic->i_send_ring.w_nr;
+ send = &ic->i_sends[pos];
+ }
+
+ if (frr->f_page_list) {
+ memset(send, 0, sizeof(*send));
+ send->s_wr.opcode = IB_WR_FAST_REG_MR;
+ send->s_wr.wr.fast_reg.length = frr->f_length;
+ send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
+ send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
+ send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
+ send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
+ send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_READ |
+ IB_ACCESS_REMOTE_WRITE;
+ send->s_fastreg = frr;
+ send->s_queued = jiffies;
+ }
+
+ atomic_add(num_wrs, &frr->f_refcnt);
+
+ /* Chain the two WRs together */
+ if (num_wrs == 2)
+ first->s_wr.next = &send->s_wr;
+
+ failed_wr = &first->s_wr;
+ ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
+
+ rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
+ first, &first->s_wr, ret, failed_wr);
+ BUG_ON(failed_wr != &first->s_wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+ while (num_wrs--)
+ rds_ib_fastreg_release(frr);
+ rds_ib_ring_unalloc(&ic->i_send_ring, 2);
+ return ret;
+ }
+
+out:
+ return ret;
+}
+
+int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
+{
+ struct rds_ib_fastreg *frr;
+
+ frr = rds_ib_rdma_get_fastreg(mr);
+ if (!frr)
+ return 0;
+ if (IS_ERR(frr))
+ return PTR_ERR(frr);
+ return __rds_ib_xmit_fastreg(conn, frr);
+}
+
void rds_ib_xmit_complete(struct rds_connection *conn)
{
struct rds_ib_connection *ic = conn->c_transport_data;
Index: build-2.6/net/rds/send.c
===================================================================
--- build-2.6.orig/net/rds/send.c
+++ build-2.6/net/rds/send.c
@@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
+ /* If we were in the process of performing a fastreg
+ * memory registration when the connection went down,
+ * we have to retry it. */
+ clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
}
list_splice_init(&conn->c_retrans, &conn->c_send_queue);
spin_unlock_irqrestore(&conn->c_lock, flags);
@@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
if (cmsg->cmsg_level != SOL_RDS)
continue;
+ /* As a side effect, RDMA_DEST and RDMA_MAP will set
+ * rm->m_rdma_cookie and rm->m_rdma_mr.
+ */
switch (cmsg->cmsg_type) {
case RDS_CMSG_RDMA_ARGS:
ret = rds_cmsg_rdma_args(rs, rm, cmsg);
Index: build-2.6/net/rds/message.c
===================================================================
--- build-2.6.orig/net/rds/message.c
+++ build-2.6/net/rds/message.c
@@ -71,6 +71,8 @@ static void rds_message_purge(struct rds
if (rm->m_rdma_op)
rds_rdma_free_op(rm->m_rdma_op);
+ if (rm->m_rdma_mr)
+ rds_mr_put(rm->m_rdma_mr);
}
void rds_message_inc_purge(struct rds_incoming *inc)
Index: build-2.6/net/rds/rdma.h
===================================================================
--- build-2.6.orig/net/rds/rdma.h
+++ build-2.6/net/rds/rdma.h
@@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *r
void rds_rdma_free_op(struct rds_rdma_op *ro);
void rds_rdma_send_complete(struct rds_message *rm, int);
+extern void __rds_put_mr_final(struct rds_mr *mr);
+static inline void rds_mr_put(struct rds_mr *mr)
+{
+ if (atomic_dec_and_test(&mr->r_refcount))
+ __rds_put_mr_final(mr);
+}
+
#endif
Index: build-2.6/net/rds/ib_recv.c
===================================================================
--- build-2.6.orig/net/rds/ib_recv.c
+++ build-2.6/net/rds/ib_recv.c
@@ -796,7 +796,7 @@ void rds_ib_recv_cq_comp_handler(struct
while (ib_poll_cq(cq, 1, &wc) > 0) {
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc.wr_id, wc.status, wc.byte_len,
- be32_to_cpu(wc.imm_data));
+ be32_to_cpu(wc.ex.imm_data));
rds_ib_stats_inc(s_ib_rx_cq_event);
recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general