On Mon, Jul 07, 2008 at 02:40:45PM +0200, Olaf Kirch wrote:
> On Thursday 03 July 2008 23:34:12 Jon Mason wrote:
> > This patch adds support for running RDS over iWARP adapters.  It
> Hi Jon,
> 
> I took your patch and tried to isolate the iWARP specific changes
> in bcopy mode, and roll them into a smaller patch that doesn't duplicate
> all the ib*.[hc] files.
> 
> I also tried to come to some working solution for RDMA - as you can
> see from the deluge of messages I wrote on this :-) the approach you
> chose has some problems.
> 
> Please take a look at the attached patch and let me know whether
> (a) bcopy mode works, and (b) if the rdma approach may work with
> iwarp nics.

It doesn't seem to work.  After looking through the code, the
check_laddr will fail because it is still looking for the IB arp and not
the inet arp needed by iWARP.  I hacked around that but it is still not
working.  I'll look through the patch more and see if I can determine
what is still breaking.

Thanks,
Jon

> 
> Olaf
> 
> -- 
> Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
> [EMAIL PROTECTED] |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax

> From: Olaf Kirch <[EMAIL PROTECTED]>
> Subject: [PATCH RFC] RDS: Add iWARP Support
> 
> This is based on the work posted by Jon Mason. It extracts
> the iWARP-specific changes that are needed to support bcopy
> mode (I hope I caught all of them).
> 
> I also did some work on RDMA support. This is a lot harder,
> because the interface and implementation were designed with
> classic MRs in mind. However, I think the approach taken below
> may result in a working approach (it's not working yet - I left
> some blanks and BUG() asserts in there, because I wanted to get this
> patch out as a RFC sooner rather than later).
> 
> Also, this is a pretty large patch - it needs to be broken down into
> half a dozen or so smaller functional changes for better review.
> 
> Olaf
> ---
>  net/rds/ib.c      |   30 ++
>  net/rds/ib.h      |   55 ++++
>  net/rds/ib_cm.c   |   36 ++-
>  net/rds/ib_rdma.c |  610 
> +++++++++++++++++++++++++++++++++++++++++++++---------
>  net/rds/ib_recv.c |    2 
>  net/rds/ib_send.c |  133 +++++++++++
>  net/rds/message.c |    2 
>  net/rds/rdma.c    |   17 -
>  net/rds/rdma.h    |    7 
>  net/rds/rds.h     |    4 
>  net/rds/send.c    |    7 
>  11 files changed, 778 insertions(+), 125 deletions(-)
> 
> Index: build-2.6/net/rds/ib.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib.c
> +++ build-2.6/net/rds/ib.c
> @@ -42,6 +42,7 @@
>  
>  unsigned int fmr_pool_size = RDS_FMR_POOL_SIZE;
>  unsigned int fmr_message_size = RDS_FMR_SIZE + 1; /* +1 allows for unaligned 
> MRs */
> +unsigned int fastreg_pool_size = RDS_FMR_POOL_SIZE;
>  
>  module_param(fmr_pool_size, int, 0444);
>  MODULE_PARM_DESC(fmr_pool_size, " Max number of fmr per HCA");
> @@ -85,21 +86,38 @@ void rds_ib_add_one(struct ib_device *de
>       rds_ibdev->fmr_page_size  = 1 << rds_ibdev->fmr_page_shift;
>       rds_ibdev->fmr_page_mask  = ~((u64) rds_ibdev->fmr_page_size - 1);
>       rds_ibdev->fmr_max_remaps = dev_attr->max_map_per_fmr?: 32;
> -     rds_ibdev->max_fmrs = dev_attr->max_fmr?
> -                     min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) :
> -                     fmr_pool_size;
> +     rds_ibdev->max_fmrs = dev_attr->max_fmr;
>  
>       rds_ibdev->dev = device;
>       rds_ibdev->pd = ib_alloc_pd(device);
>       if (IS_ERR(rds_ibdev->pd))
>               goto free_dev;
>  
> -     rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> -                                   IB_ACCESS_LOCAL_WRITE);
> +     if (device->node_type != RDMA_NODE_RNIC) {
> +             rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> +                                     IB_ACCESS_LOCAL_WRITE);
> +     } else {
> +             /* Why does it have to have these permissions? */
> +             rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd,
> +                                     IB_ACCESS_REMOTE_READ |
> +                                     IB_ACCESS_REMOTE_WRITE |
> +                                     IB_ACCESS_LOCAL_WRITE);
> +     }
>       if (IS_ERR(rds_ibdev->mr))
>               goto err_pd;
>  
> -     rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> +     /* Create the MR pool. We choose different strategies for
> +      * MRs depending on the hardware.
> +      */
> +     if (dev_attr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
> +             /* Use fast registrations */
> +             rds_ibdev->mr_pool = rds_ib_create_fastreg_pool(rds_ibdev);
> +             rds_ibdev->use_fastreg = 1;
> +     } else {
> +             /* Default: use FMRs. Would be nice if there was
> +              * a capability flag to test for. */
> +             rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
> +     }
>       if (IS_ERR(rds_ibdev->mr_pool)) {
>               rds_ibdev->mr_pool = NULL;
>               goto err_mr;
> Index: build-2.6/net/rds/ib.h
> ===================================================================
> --- build-2.6.orig/net/rds/ib.h
> +++ build-2.6/net/rds/ib.h
> @@ -49,9 +49,51 @@ struct rds_ib_connect_private {
>       __be32                  dp_credit;              /* non-zero enables 
> flow ctl */
>  };
>  
> +struct rds_ib_scatterlist {
> +     struct scatterlist *    list;
> +     unsigned int            len;
> +     int                     dma_len;
> +};
> +
> +/* We need to post a LOCAL_INV request unless f_old_rkey
> + * has this value. */
> +#define RDS_IB_INVALID_FASTREG_KEY 0
> +
> +struct rds_ib_fastreg {
> +     atomic_t                f_refcnt;
> +     unsigned int            f_posted : 1,
> +                             f_done : 1;
> +
> +     u32                     f_old_rkey;
> +
> +     u32                     f_rkey;
> +     unsigned int            f_length;
> +
> +     struct rds_ib_scatterlist f_sg;
> +
> +     struct ib_fast_reg_page_list *f_page_list;
> +     unsigned int            f_page_list_len;
> +     unsigned int            f_page_shift;
> +
> +#if 0
> +     u32                     f_invalidate_rkey;
> +     struct ib_send_wr       f_wr;
> +     wait_queue_head_t       f_waitq;
> +     struct list_head        f_list;
> +     unsigned int            f_done;
> +     int                     f_status;
> +#endif
> +
> +     struct rds_ib_mr        *f_mr;
> +};
> +
>  struct rds_ib_send_work {
>       struct rds_message      *s_rm;
> +
> +     /* We should really put these into a union: */
>       struct rds_rdma_op      *s_op;
> +     struct rds_ib_fastreg   *s_fastreg;
> +
>       struct ib_send_wr       s_wr;
>       struct ib_sge           s_sge[RDS_IB_MAX_SGE];
>       unsigned long           s_queued;
> @@ -86,6 +128,7 @@ struct rds_ib_connection {
>       struct rds_header       *i_send_hdrs;
>       u64                     i_send_hdrs_dma;
>       struct rds_ib_send_work *i_sends;
> +     struct list_head        i_fastreg_pending;
>  
>       /* rx */
>       struct mutex            i_recv_mutex;
> @@ -123,7 +166,9 @@ struct rds_ib_connection {
>       atomic_t                i_credits;
>  
>       /* Protocol version specific information */
> -     unsigned int            i_flowctl : 1;  /* enable/disable flow ctl */
> +     unsigned int            i_flowctl : 1,  /* enable/disable flow ctl */
> +                             i_iwarp   : 1,  /* this is actually iWARP not 
> IB */
> +                             i_fastreg : 1;  /* use fastreg */
>  
>       /* Batched completions */
>       unsigned int            i_unsignaled_wrs;
> @@ -154,6 +199,7 @@ struct rds_ib_device {
>       unsigned int            fmr_max_remaps;
>       unsigned int            max_fmrs;
>       int                     max_sge;
> +     unsigned int            use_fastreg : 1;
>       spinlock_t              spinlock;
>  };
>  
> @@ -236,6 +282,7 @@ extern void rds_ib_remove_one(struct ib_
>  extern struct ib_client rds_ib_client;
>  
>  extern unsigned int fmr_pool_size;
> +extern unsigned int fastreg_pool_size;
>  extern unsigned int fmr_message_size;
>  
>  /* ib_cm.c */
> @@ -254,6 +301,7 @@ void __rds_ib_conn_error(struct rds_conn
>  /* ib_rdma.c */
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 
> ipaddr);
>  struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *);
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device *);
>  void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct 
> rds_info_ib_connection *iinfo);
>  void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
> @@ -261,6 +309,10 @@ void *rds_ib_get_mr(struct scatterlist *
>  void rds_ib_sync_mr(void *trans_private, int dir);
>  void rds_ib_free_mr(void *trans_private, int invalidate);
>  void rds_ib_flush_mrs(void);
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *);
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr);
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status);
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status);
>  
>  /* ib_recv.c */
>  int __init rds_ib_recv_init(void);
> @@ -298,6 +350,7 @@ void rds_ib_send_cq_comp_handler(struct 
>  void rds_ib_send_init_ring(struct rds_ib_connection *ic);
>  void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
>  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr);
>  void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int 
> credits);
>  void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int 
> posted);
>  int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
> Index: build-2.6/net/rds/ib_rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_rdma.c
> +++ build-2.6/net/rds/ib_rdma.c
> @@ -45,20 +45,31 @@ extern struct list_head rds_ib_devices;
>  struct rds_ib_mr {
>       struct rds_ib_device    *device;
>       struct rds_ib_mr_pool   *pool;
> -     struct ib_fmr           *fmr;
> +
> +     spinlock_t              lock;
> +     union {
> +         struct {
> +             struct ib_fmr   *fmr;
> +         } ib;
> +         struct {
> +             struct ib_fast_reg_page_list *page_list;
> +             struct ib_mr    *fastreg_mr;
> +             u32             rkey;
> +             struct rds_ib_fastreg *pending;
> +         } iwarp;
> +     } u;
>       struct list_head        list;
>       unsigned int            remap_count;
>  
> -     struct scatterlist *    sg;
> -     unsigned int            sg_len;
> -     u64 *                   dma;
> -     int                     sg_dma_len;
> +     struct rds_ib_scatterlist sg;
>  };
>  
>  /*
>   * Our own little FMR pool
>   */
>  struct rds_ib_mr_pool {
> +     struct rds_ib_device *  device;
> +
>       struct mutex            flush_lock;             /* serialize fmr 
> invalidate */
>       struct work_struct      flush_worker;           /* flush worker */
>  
> @@ -68,16 +79,57 @@ struct rds_ib_mr_pool {
>       struct list_head        drop_list;              /* MRs that have 
> reached their max_maps limit */
>       struct list_head        free_list;              /* unused MRs */
>       struct list_head        clean_list;             /* unused & unamapped 
> MRs */
> +     struct list_head        fastreg_list;           /* pending fastreg's */
>       atomic_t                free_pinned;            /* memory pinned by 
> free MRs */
> +     unsigned long           max_message_size;       /* in pages */
>       unsigned long           max_items;
>       unsigned long           max_items_soft;
>       unsigned long           max_free_pinned;
>       struct ib_fmr_attr      fmr_attr;
> +
> +     /* Dummy QP used to handle invalidate for fastreg */
> +     struct ib_qp            *qp;
> +
> +     struct rds_ib_mr_pool_ops *op;
> +};
> +
> +struct rds_ib_mr_pool_ops {
> +     int                     (*init)(struct rds_ib_mr_pool *, struct 
> rds_ib_mr *);
> +     int                     (*map)(struct rds_ib_mr_pool *pool, struct 
> rds_ib_mr *ibmr,
> +                                     struct scatterlist *sg, unsigned int 
> sg_len);
> +     void                    (*unmap)(struct rds_ib_mr_pool *, struct 
> list_head *);
> +     void                    (*destroy)(struct rds_ib_mr_pool *, struct 
> rds_ib_mr *);
>  };
>  
>  static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
>  static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
>  static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr 
> *ibmr);
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> +                       struct rds_ib_mr *ibmr,
> +                       struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool, struct 
> list_head *unmap_list);
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool, struct rds_ib_mr 
> *ibmr);
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool, struct rds_ib_mr 
> *ibmr);
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> +                       struct rds_ib_mr *ibmr,
> +                       struct scatterlist *sg, unsigned int nents);
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool, struct 
> list_head *unmap_list);
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool, struct 
> rds_ib_mr *ibmr);
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fmr_pool_ops = {
> +     .init           = rds_ib_init_fmr,
> +     .map            = rds_ib_map_fmr,
> +     .unmap          = rds_ib_unmap_fmr_list,
> +     .destroy        = rds_ib_destroy_fmr,
> +};
> +
> +static struct rds_ib_mr_pool_ops rds_ib_fastreg_pool_ops = {
> +     .init           = rds_ib_init_fastreg,
> +     .map            = rds_ib_map_fastreg,
> +     .unmap          = rds_ib_unmap_fastreg_list,
> +     .destroy        = rds_ib_destroy_fastreg,
> +};
>  
>  int ib_update_ipaddr_for_device(struct rds_ib_device *rds_ibdev, __be32 
> ipaddr)
>  {
> @@ -124,7 +176,158 @@ struct rds_ib_device* ib_get_device(__be
>       return NULL;
>  }
>  
> -struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +static void rds_ib_set_scatterlist(struct rds_ib_scatterlist *sg,
> +             struct scatterlist *list,
> +             unsigned int sg_len, unsigned int sg_dma_len)
> +{
> +     sg->list = list;
> +     sg->len = sg_len;
> +     sg->dma_len = sg_dma_len;
> +}
> +
> +static void rds_ib_rdma_drop_scatterlist(struct rds_ib_device *rds_ibdev,
> +             struct rds_ib_scatterlist *sg)
> +{
> +     if (sg->dma_len) {
> +             ib_dma_unmap_sg(rds_ibdev->dev,
> +                             sg->list, sg->len,
> +                             DMA_BIDIRECTIONAL);
> +             sg->dma_len = 0;
> +     }
> +
> +     /* Release the s/g list */
> +     if (sg->len) {
> +             unsigned int i;
> +
> +             for (i = 0; i < sg->len; ++i) {
> +                     struct page *page = sg_page(&sg->list[i]);
> +
> +                     /* FIXME we need a way to tell a r/w MR
> +                      * from a r/o MR */
> +                     set_page_dirty(page);
> +                     put_page(page);
> +             }
> +             kfree(sg->list);
> +
> +             sg->list = NULL;
> +             sg->len = 0;
> +     }
> +}
> +
> +/*
> + * IB FMR handling
> + */
> +static int rds_ib_init_fmr(struct rds_ib_mr_pool *pool,
> +                             struct rds_ib_mr *ibmr)
> +{
> +     struct rds_ib_device *rds_ibdev = pool->device;
> +     struct ib_fmr *fmr;
> +
> +     fmr = ib_alloc_fmr(rds_ibdev->pd,
> +                     (IB_ACCESS_LOCAL_WRITE |
> +                      IB_ACCESS_REMOTE_READ |
> +                      IB_ACCESS_REMOTE_WRITE),
> +                     &pool->fmr_attr);
> +     if (IS_ERR(fmr)) {
> +             int err = PTR_ERR(fmr);
> +
> +             printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", 
> err);
> +             return err;
> +     }
> +
> +     ibmr->u.ib.fmr = fmr;
> +     return 0;
> +}
> +
> +static void rds_ib_unmap_fmr_list(struct rds_ib_mr_pool *pool,
> +                             struct list_head *unmap_list)
> +{
> +     struct rds_ib_mr *ibmr;
> +     LIST_HEAD(fmr_list);
> +     int ret;
> +
> +     /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> +     list_for_each_entry(ibmr, unmap_list, list)
> +             list_add(&ibmr->u.ib.fmr->list, &fmr_list);
> +     ret = ib_unmap_fmr(&fmr_list);
> +     if (ret)
> +             printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", 
> ret);
> +}
> +
> +static void rds_ib_destroy_fmr(struct rds_ib_mr_pool *pool,
> +                             struct rds_ib_mr *ibmr)
> +{
> +     if (ibmr->u.ib.fmr)
> +             ib_dealloc_fmr(ibmr->u.ib.fmr);
> +     ibmr->u.ib.fmr = NULL;
> +}
> +
> +/*
> + * iWARP fastreg handling
> + */
> +static int rds_ib_init_fastreg(struct rds_ib_mr_pool *pool,
> +                             struct rds_ib_mr *ibmr)
> +{
> +     struct rds_ib_device *rds_ibdev = pool->device;
> +     struct ib_mr *mr;
> +
> +     mr = ib_alloc_fast_reg_mr(rds_ibdev->pd, pool->max_message_size);
> +     if (IS_ERR(mr)) {
> +             int err = PTR_ERR(mr);
> +
> +             printk(KERN_WARNING "RDS/IWARP: ib_alloc_fast_reg_mr failed 
> (err=%d)\n", err);
> +             return err;
> +     }
> +
> +     ibmr->u.iwarp.rkey = RDS_IB_INVALID_FASTREG_KEY;
> +     ibmr->u.iwarp.fastreg_mr = mr;
> +     return 0;
> +}
> +
> +static void rds_ib_unmap_fastreg_list(struct rds_ib_mr_pool *pool,
> +                             struct list_head *unmap_list)
> +{
> +     LIST_HEAD(fmr_list);
> +
> +     /* Batched invalidation of fastreg MRs.
> +      * Why do we do it this way, even though we could pipeline unmap
> +      * and remap? The reason is the application semantics - when the
> +      * application requests an invalidation of MRs, it expects all
> +      * previously released R_Keys to become invalid.
> +      *
> +      * If we implement MR reuse naively, we risk memory corruption
> +      * (this has actually been observed). So the default behavior
> +      * requires that a MR goes through an explicit unmap operation before
> +      * we can reuse it again.
> +      *
> +      * We could probably improve on this a little, by allowing immediate
> +      * reuse of a MR on the same socket (eg you could add small
> +      * cache of unused MRs to strct rds_socket - GET_MR could grab one
> +      * of these without requiring an explicit invalidate).
> +      */
> +
> +     /* Fill in the blanks:
> +         Go through the list of dirty MRs, and post LOCAL_INV WRs to the
> +         dummy pool->qp. When the completion for the last WR arrives,
> +         the CQ handler wakes up the caller.
> +       */
> +     BUG(); /* not implemented yet. */
> +}
> +
> +static void rds_ib_destroy_fastreg(struct rds_ib_mr_pool *pool,
> +             struct rds_ib_mr *ibmr)
> +{
> +     if (ibmr->u.iwarp.page_list)
> +             ib_free_fast_reg_page_list(ibmr->u.iwarp.page_list);
> +     if (ibmr->u.iwarp.fastreg_mr)
> +             ib_dereg_mr(ibmr->u.iwarp.fastreg_mr);
> +     if (ibmr->u.iwarp.pending)
> +             rds_ib_fastreg_release(ibmr->u.iwarp.pending);
> +}
> +
> +struct rds_ib_mr_pool *__rds_ib_create_mr_pool(struct rds_ib_device 
> *rds_ibdev,
> +             unsigned int message_size, unsigned int pool_size,
> +             struct rds_ib_mr_pool_ops *ops)
>  {
>       struct rds_ib_mr_pool *pool;
>  
> @@ -132,25 +335,68 @@ struct rds_ib_mr_pool *rds_ib_create_mr_
>       if (!pool)
>               return ERR_PTR(-ENOMEM);
>  
> +     pool->device = rds_ibdev;
>       INIT_LIST_HEAD(&pool->free_list);
>       INIT_LIST_HEAD(&pool->drop_list);
>       INIT_LIST_HEAD(&pool->clean_list);
> +     INIT_LIST_HEAD(&pool->fastreg_list);
>       mutex_init(&pool->flush_lock);
>       spin_lock_init(&pool->list_lock);
>       INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
>  
> -     pool->fmr_attr.max_pages = fmr_message_size;
> -     pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> -     pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> -     pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
> +     pool->max_message_size = message_size;
> +     pool->max_items = pool_size;
> +     pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
>  
>       /* We never allow more than max_items MRs to be allocated.
>        * When we exceed more than max_items_soft, we start freeing
>        * items more aggressively.
>        * Make sure that max_items > max_items_soft > max_items / 2
>        */
> -     pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
> -     pool->max_items = rds_ibdev->max_fmrs;
> +     pool->max_items_soft = pool->max_items * 3 / 4;
> +
> +     return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
> +{
> +     struct rds_ib_mr_pool *pool;
> +     unsigned int pool_size = fmr_pool_size;
> +
> +     if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> +             pool_size = rds_ibdev->max_fmrs;
> +
> +     pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size, pool_size,
> +                             &rds_ib_fmr_pool_ops);
> +
> +     if (!IS_ERR(pool)) {
> +             pool->fmr_attr.max_pages = pool->max_message_size;
> +             pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
> +             pool->fmr_attr.page_shift = rds_ibdev->fmr_page_shift;
> +     }
> +
> +     return pool;
> +}
> +
> +struct rds_ib_mr_pool *rds_ib_create_fastreg_pool(struct rds_ib_device 
> *rds_ibdev)
> +{
> +     struct rds_ib_mr_pool *pool;
> +     unsigned int pool_size = fmr_pool_size;
> +
> +     if (rds_ibdev->max_fmrs && rds_ibdev->max_fmrs < pool_size)
> +             pool_size = rds_ibdev->max_fmrs;
> +
> +     pool = __rds_ib_create_mr_pool(rds_ibdev, fmr_message_size,
> +                             fastreg_pool_size,
> +                             &rds_ib_fastreg_pool_ops);
> +
> +     if (!IS_ERR(pool)) {
> +             /* Fill in the blanks:
> +              *  create a dummy QP to which we can post LOCAL_INV
> +              *  requests when invalidating MRs
> +              */
> +             pool->qp = NULL;
> +     }
>  
>       return pool;
>  }
> @@ -169,6 +415,10 @@ void rds_ib_destroy_mr_pool(struct rds_i
>       rds_ib_flush_mr_pool(pool, 1);
>       BUG_ON(atomic_read(&pool->item_count));
>       BUG_ON(atomic_read(&pool->free_pinned));
> +
> +     if (pool->qp)
> +             ib_destroy_qp(pool->qp);
> +
>       kfree(pool);
>  }
>  
> @@ -227,77 +477,82 @@ static struct rds_ib_mr *rds_ib_alloc_fm
>               goto out_no_cigar;
>       }
>  
> -     ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
> -                     (IB_ACCESS_LOCAL_WRITE |
> -                      IB_ACCESS_REMOTE_READ |
> -                      IB_ACCESS_REMOTE_WRITE),
> -                     &pool->fmr_attr);
> -     if (IS_ERR(ibmr->fmr)) {
> -             err = PTR_ERR(ibmr->fmr);
> -             ibmr->fmr = NULL;
> -             printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", 
> err);
> +     spin_lock_init(&ibmr->lock);
> +
> +     err = pool->op->init(pool, ibmr);
> +     if (err)
>               goto out_no_cigar;
> -     }
>  
>       rds_ib_stats_inc(s_ib_rdma_mr_alloc);
>       return ibmr;
>  
>  out_no_cigar:
>       if (ibmr) {
> -             if (ibmr->fmr)
> -                     ib_dealloc_fmr(ibmr->fmr);
> +             pool->op->destroy(pool, ibmr);
>               kfree(ibmr);
>       }
>       atomic_dec(&pool->item_count);
>       return ERR_PTR(err);
>  }
>  
> -static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr 
> *ibmr,
> -            struct scatterlist *sg, unsigned int nents)
> +static int rds_ib_count_dma_pages(struct rds_ib_device *rds_ibdev,
> +                       struct scatterlist *sg, unsigned int sg_dma_len,
> +                       unsigned int *lenp)
>  {
>       struct ib_device *dev = rds_ibdev->dev;
> -     struct scatterlist *scat = sg;
> -     u64 io_addr = 0;
> -     u64 *dma_pages;
> -     u32 len;
> -     int page_cnt, sg_dma_len;
> -     int i, j;
> -     int ret;
> -
> -     sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> -                              DMA_BIDIRECTIONAL);
> -     if (unlikely(!sg_dma_len)) {
> -             printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> -             return -EBUSY;
> -     }
> -
> -     len = 0;
> -     page_cnt = 0;
> +     unsigned int i, page_cnt = 0, len = 0;
>  
>       for (i = 0; i < sg_dma_len; ++i) {
> -             unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
> -             u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
> +             unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> +             u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
>       
>               if (dma_addr & ~rds_ibdev->fmr_page_mask) {
>                       if (i > 0)
>                               return -EINVAL;
> -                     else
> -                             ++page_cnt;
> +                     ++page_cnt;
>               }
>               if ((dma_addr + dma_len) & ~rds_ibdev->fmr_page_mask) {
>                       if (i < sg_dma_len - 1)
>                               return -EINVAL;
> -                     else
> -                             ++page_cnt;
> +                     ++page_cnt;
>               }
>  
>               len += dma_len;
>       }
>  
>       page_cnt += len >> rds_ibdev->fmr_page_shift;
> -     if (page_cnt > fmr_message_size)
> +     if (page_cnt > rds_ibdev->mr_pool->max_message_size)
>               return -EINVAL;
>  
> +     return page_cnt;
> +}
> +
> +static int rds_ib_map_fmr(struct rds_ib_mr_pool *pool,
> +                       struct rds_ib_mr *ibmr,
> +                       struct scatterlist *sg, unsigned int nents)
> +{
> +     struct rds_ib_device *rds_ibdev = pool->device;
> +     struct ib_device *dev = rds_ibdev->dev;
> +     struct scatterlist *scat = sg;
> +     u64 io_addr = 0;
> +     u64 *dma_pages;
> +     int page_cnt, sg_dma_len;
> +     int i, j;
> +     int ret;
> +
> +     sg_dma_len = ib_dma_map_sg(dev, sg, nents,
> +                              DMA_BIDIRECTIONAL);
> +     if (unlikely(!sg_dma_len)) {
> +             printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
> +             return -EBUSY;
> +     }
> +
> +     /* FIXME: when returning an error, we need to unmap the SG */
> +
> +     page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, NULL);
> +     if (page_cnt < 0)
> +             return page_cnt;
> +
>       dma_pages = kmalloc(sizeof(u64) * page_cnt, GFP_ATOMIC);
>       if (!dma_pages)
>               return -ENOMEM;
> @@ -312,7 +567,7 @@ static int rds_ib_map_fmr(struct rds_ib_
>                               (dma_addr & rds_ibdev->fmr_page_mask) + j;
>       }
>                               
> -     ret = ib_map_phys_fmr(ibmr->fmr,
> +     ret = ib_map_phys_fmr(ibmr->u.ib.fmr,
>                                  dma_pages, page_cnt, io_addr);       
>       if (ret)
>               goto out;
> @@ -321,9 +576,9 @@ static int rds_ib_map_fmr(struct rds_ib_
>        * safely tear down the old mapping. */
>       rds_ib_teardown_mr(ibmr);
>  
> -     ibmr->sg = scat;
> -     ibmr->sg_len = nents;
> -     ibmr->sg_dma_len = sg_dma_len;
> +     ibmr->sg.list = scat;
> +     ibmr->sg.len = nents;
> +     ibmr->sg.dma_len = sg_dma_len;
>       ibmr->remap_count++;
>  
>       rds_ib_stats_inc(s_ib_rdma_mr_used);
> @@ -335,6 +590,192 @@ out:
>       return ret;
>  }
>  
> +static int rds_ib_map_fastreg(struct rds_ib_mr_pool *pool,
> +                     struct rds_ib_mr *ibmr,
> +                     struct scatterlist *sg, unsigned int sg_len)
> +{
> +     struct rds_ib_device *rds_ibdev = pool->device;
> +     struct ib_device *dev = rds_ibdev->dev;
> +     struct ib_fast_reg_page_list *page_list = NULL;
> +     struct rds_ib_fastreg *frr;
> +     unsigned int len;
> +     int i, j, page_cnt, sg_dma_len = 0;
> +     int ret;
> +
> +     BUG_ON(ibmr->u.iwarp.pending);
> +
> +     page_list = ib_alloc_fast_reg_page_list(rds_ibdev->dev, 
> pool->max_message_size);
> +     if (IS_ERR(page_list)) {
> +             ret = PTR_ERR(page_list);
> +             page_list = NULL;
> +
> +             printk(KERN_WARNING "RDS/iWARP: ib_alloc_fast_reg_page_list 
> failed (err=%d)\n", ret);
> +             return ret;
> +     }
> +
> +     sg_dma_len = ib_dma_map_sg(dev, sg, sg_len, DMA_BIDIRECTIONAL);
> +     if (unlikely(!sg_dma_len)) {
> +             printk(KERN_WARNING "RDS/iWARP: dma_map_sg failed!\n");
> +             ret = -EBUSY;
> +             goto out;
> +     }
> +
> +     page_cnt = rds_ib_count_dma_pages(rds_ibdev, sg, sg_dma_len, &len);
> +     if (page_cnt < 0) {
> +             ret = page_cnt;
> +             goto out;
> +     }
> +
> +     page_cnt = 0;
> +     for (i = 0; i < sg_dma_len; ++i) {
> +             unsigned int dma_len = ib_sg_dma_len(dev, &sg[i]);
> +             u64 dma_addr = ib_sg_dma_address(dev, &sg[i]);
> +     
> +             for (j = 0; j < dma_len; j += rds_ibdev->fmr_page_size)
> +                     page_list->page_list[page_cnt++] =
> +                             (dma_addr & rds_ibdev->fmr_page_mask) + j;
> +     }
> +
> +     /* Allocate the fastreg request structure */
> +     frr = kzalloc(sizeof(*frr), GFP_KERNEL);
> +     if (!frr) {
> +             ret = -ENOMEM;
> +             goto out;
> +     }
> +
> +     ib_update_fast_reg_key(ibmr->u.iwarp.fastreg_mr, ibmr->remap_count++);
> +
> +     /* Build the fastreg WR */
> +     frr->f_mr = ibmr;
> +     rds_ib_set_scatterlist(&frr->f_sg, sg, sg_len, sg_dma_len);
> +     frr->f_length = len;
> +     frr->f_rkey = ibmr->u.iwarp.fastreg_mr->rkey;
> +     frr->f_page_list = page_list;
> +     frr->f_page_list_len = sg_dma_len;
> +     frr->f_page_shift = rds_ibdev->fmr_page_shift;
> +
> +     frr->f_old_rkey = ibmr->u.iwarp.rkey;
> +
> +     /* Attach the fastreg info to the MR */
> +     atomic_set(&frr->f_refcnt, 1);
> +     ibmr->u.iwarp.pending = frr;
> +
> +     rds_ib_stats_inc(s_ib_rdma_mr_used);
> +     ret = 0;
> +
> +out:
> +     if (ret) {
> +             ib_free_fast_reg_page_list(page_list);
> +             if (sg_dma_len)
> +                     ib_dma_unmap_sg(dev, sg, sg_dma_len, DMA_BIDIRECTIONAL);
> +     }
> +
> +     return ret;
> +}
> +
> +struct rds_ib_fastreg *rds_ib_rdma_get_fastreg(struct rds_mr *mr)
> +{
> +     struct rds_ib_mr *ibmr = mr->r_trans_private;
> +     struct rds_ib_fastreg *frr;
> +     unsigned long flags;
> +
> +     spin_lock_irqsave(&ibmr->lock, flags);
> +     frr = ibmr->u.iwarp.pending;
> +     if (frr) {
> +             /* FIXME: we need to mark the frr as "locked"
> +              * to prevent FREE_MR from trashing the MR
> +              * as long as the fastreg is on the queue */
> +             atomic_inc(&frr->f_refcnt);
> +     }
> +     spin_unlock_irqrestore(&ibmr->lock, flags);
> +
> +     return frr;
> +}
> +
> +void rds_ib_fastreg_release(struct rds_ib_fastreg *frr)
> +{
> +     struct rds_ib_device *rds_ibdev = NULL;
> +
> +     if (atomic_dec_and_test(&frr->f_refcnt)) {
> +             ib_free_fast_reg_page_list(frr->f_page_list);
> +             BUG(); /* FIXME: obtain rds_ibdev */
> +             rds_ib_rdma_drop_scatterlist(rds_ibdev, &frr->f_sg);
> +             kfree(frr);
> +     }
> +}
> +
> +/*
> + * These functions are called back from the send CQ handler
> + * when the LOCAL_INV or FAST_REG_MR WRs complete.
> + */
> +void rds_ib_local_inv_complete(struct rds_ib_fastreg *frr, int status)
> +{
> +     struct rds_ib_mr *ibmr = frr->f_mr;
> +
> +     spin_lock(&ibmr->lock);
> +     if (ibmr->u.iwarp.pending != frr)
> +             goto out_unlock;
> +
> +     if (status != IB_WC_SUCCESS) {
> +             /* Yikes. Invalidation failed. What can we do but complain? */
> +             printk(KERN_NOTICE "RDS/iWARP: Unable to invalidate fastreg 
> MR.\n");
> +             goto out_unlock;
> +     }
> +
> +     if (frr->f_old_rkey == ibmr->u.iwarp.rkey) {
> +             ibmr->u.iwarp.rkey = 0;
> +             /* Now we can unpin any memory pinned for this MR. */
> +             rds_ib_teardown_mr(ibmr);
> +     }
> +     frr->f_old_rkey = RDS_IB_INVALID_FASTREG_KEY;
> +
> +out_unlock:
> +     spin_unlock(&ibmr->lock);
> +
> +     /* The WR owned a reference to this frr. Drop it */
> +     rds_ib_fastreg_release(frr);
> +}
> +
> +void rds_ib_fast_reg_complete(struct rds_ib_fastreg *frr, int status)
> +{
> +     struct rds_ib_mr *ibmr = frr->f_mr;
> +
> +     spin_lock(&ibmr->lock);
> +
> +     /* Technically, this would be a bug */
> +     if (ibmr->u.iwarp.pending != frr)
> +             goto out_unlock;
> +
> +     if (status != IB_WC_SUCCESS) {
> +             /* Yikes. We were unable to register the application's
> +              * memory. We have no way of notifying the application.
> +              * We could probably tear down the QP and cry uncle, but
> +              * the SEND may already have gone out.
> +              * The only solace is that the RDMA initiated by the remote
> +              * will fail, because the key isn't valid.
> +              */
> +             if (printk_ratelimit())
> +                     printk(KERN_NOTICE "RDS/iWARP: Unable to "
> +                                     "perform fast memory registration.\n");
> +             goto out_unlock;
> +     }
> +
> +     ibmr->sg = frr->f_sg;
> +     ibmr->u.iwarp.page_list = frr->f_page_list;
> +     ibmr->u.iwarp.rkey = frr->f_rkey;
> +
> +     /* Detach frr from MR. We still have at least one ref after this */
> +     ibmr->u.iwarp.pending = NULL;
> +     rds_ib_fastreg_release(frr);
> +     frr->f_done = 1;
> +
> +out_unlock:
> +     spin_unlock(&ibmr->lock);
> +
> +     /* The WR owned a reference to this frr. Drop it */
> +     rds_ib_fastreg_release(frr);
> +}
> +
>  void rds_ib_sync_mr(void *trans_private, int direction)
>  {
>       struct rds_ib_mr *ibmr = trans_private;
> @@ -342,49 +783,24 @@ void rds_ib_sync_mr(void *trans_private,
>  
>       switch (direction) {
>       case DMA_FROM_DEVICE:
> -             ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
> -                     ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> +             ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg.list,
> +                     ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
>               break;
>       case DMA_TO_DEVICE:
> -             ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg,
> -                     ibmr->sg_dma_len, DMA_BIDIRECTIONAL);
> +             ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg.list,
> +                     ibmr->sg.dma_len, DMA_BIDIRECTIONAL);
>               break;
>       }
>  }
>  
>  static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
>  {
> -     struct rds_ib_device *rds_ibdev = ibmr->device;
> -
> -     if (ibmr->sg_dma_len) {
> -             ib_dma_unmap_sg(rds_ibdev->dev,
> -                             ibmr->sg, ibmr->sg_len,
> -                             DMA_BIDIRECTIONAL);
> -             ibmr->sg_dma_len = 0;
> -     }
> -
> -     /* Release the s/g list */
> -     if (ibmr->sg_len) {
> -             unsigned int i;
> -
> -             for (i = 0; i < ibmr->sg_len; ++i) {
> -                     struct page *page = sg_page(&ibmr->sg[i]);
> -
> -                     /* FIXME we need a way to tell a r/w MR
> -                      * from a r/o MR */
> -                     set_page_dirty(page);
> -                     put_page(page);
> -             }
> -             kfree(ibmr->sg);
> -
> -             ibmr->sg = NULL;
> -             ibmr->sg_len = 0;
> -     }
> +     rds_ib_rdma_drop_scatterlist(ibmr->device, &ibmr->sg);
>  }
>  
>  void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
>  {
> -     unsigned int pinned = ibmr->sg_len;
> +     unsigned int pinned = ibmr->sg.len;
>  
>       __rds_ib_teardown_mr(ibmr);
>       if (pinned) {
> @@ -419,7 +835,6 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
>  {
>       struct rds_ib_mr *ibmr, *next;
>       LIST_HEAD(unmap_list);
> -     LIST_HEAD(fmr_list);
>       unsigned long unpinned = 0;
>       unsigned long flags;
>       unsigned int nfreed = 0, ncleaned = 0, free_goal;
> @@ -443,21 +858,17 @@ int rds_ib_flush_mr_pool(struct rds_ib_m
>       if (list_empty(&unmap_list))
>               goto out;
>  
> -     /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
> -     list_for_each_entry(ibmr, &unmap_list, list)
> -             list_add(&ibmr->fmr->list, &fmr_list);
> -     ret = ib_unmap_fmr(&fmr_list);
> -     if (ret)
> -             printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", 
> ret);
> +     /* Batched invalidate of dirty MRs: */
> +     pool->op->unmap(pool, &unmap_list);
>  
>       /* Now we can destroy the DMA mapping and unpin any pages */
>       list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
> -             unpinned += ibmr->sg_len;
> +             unpinned += ibmr->sg.len;
>               __rds_ib_teardown_mr(ibmr);
>               if (nfreed < free_goal || ibmr->remap_count >= 
> pool->fmr_attr.max_maps) {
>                       rds_ib_stats_inc(s_ib_rdma_mr_free);
>                       list_del(&ibmr->list);
> -                     ib_dealloc_fmr(ibmr->fmr);
> +                     pool->op->destroy(pool, ibmr);
>                       kfree(ibmr);
>                       nfreed++;
>               }
> @@ -491,7 +902,7 @@ void rds_ib_free_mr(void *trans_private,
>       struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
>       unsigned long flags;
>  
> -     rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
> +     rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg.len);
>  
>       /* Return it to the pool's free list */
>       spin_lock_irqsave(&pool->list_lock, flags);
> @@ -500,7 +911,7 @@ void rds_ib_free_mr(void *trans_private,
>       } else {
>               list_add(&ibmr->list, &pool->free_list);
>       }
> -     atomic_add(ibmr->sg_len, &pool->free_pinned);
> +     atomic_add(ibmr->sg.len, &pool->free_pinned);
>       atomic_inc(&pool->dirty_count);
>       spin_unlock_irqrestore(&pool->list_lock, flags);
>  
> @@ -536,6 +947,7 @@ void *rds_ib_get_mr(struct scatterlist *
>                   __be32 ip_addr, u32 *key_ret)
>  {
>       struct rds_ib_device *rds_ibdev;
> +     struct rds_ib_mr_pool *pool;
>       struct rds_ib_mr *ibmr = NULL;
>       int ret;
>  
> @@ -545,7 +957,7 @@ void *rds_ib_get_mr(struct scatterlist *
>               goto out;
>       }
>  
> -     if (!rds_ibdev->mr_pool) {
> +     if (!(pool = rds_ibdev->mr_pool)) {
>               ret = -ENODEV;
>               goto out;
>       }
> @@ -554,9 +966,9 @@ void *rds_ib_get_mr(struct scatterlist *
>       if (IS_ERR(ibmr))
>               return ibmr;
>  
> -     ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents);
> +     ret = pool->op->map(pool, ibmr, sg, nents);
>       if (ret == 0)
> -             *key_ret = ibmr->fmr->rkey;
> +             *key_ret = ibmr->u.ib.fmr->rkey;
>       else
>               printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
>  
> Index: build-2.6/net/rds/rdma.c
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.c
> +++ build-2.6/net/rds/rdma.c
> @@ -116,11 +116,8 @@ static void rds_destroy_mr(struct rds_mr
>               mr->r_trans->free_mr(trans_private, mr->r_invalidate);
>  }
>  
> -static void rds_mr_put(struct rds_mr *mr)
> +void __rds_put_mr_final(struct rds_mr *mr)
>  {
> -     if (!atomic_dec_and_test(&mr->r_refcount))
> -             return;
> -
>       rds_destroy_mr(mr);
>       kfree(mr);
>  }
> @@ -169,7 +166,7 @@ static int rds_pin_pages(unsigned long u
>  }
>  
>  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
> -                             u64 *cookie_ret)
> +                             u64 *cookie_ret, struct rds_mr **mr_ret)
>  {
>       struct rds_mr *mr = NULL, *found;
>       unsigned int nr_pages;
> @@ -297,6 +294,10 @@ static int __rds_rdma_map(struct rds_soc
>  
>       rdsdebug("RDS: get_mr key is %x\n", mr->r_key);
>  
> +     if (mr_ret) {
> +             atomic_inc(&mr->r_refcount);
> +             *mr_ret = mr;
> +     }
>       ret = 0;
>  out:
>       if (pages)
> @@ -317,7 +318,7 @@ int rds_get_mr(struct rds_sock *rs, char
>                          sizeof(struct rds_get_mr_args)))
>               return -EFAULT;
>  
> -     return __rds_rdma_map(rs, &args, NULL);
> +     return __rds_rdma_map(rs, &args, NULL, NULL);
>  }
>  
>  /*
> @@ -655,7 +656,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *
>  
>       if (mr) {
>               mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
> -             rds_mr_put(mr);
> +             rm->m_rdma_mr = mr;
>       }
>       return err;
>  }
> @@ -673,5 +674,5 @@ int rds_cmsg_rdma_map(struct rds_sock *r
>        || rm->m_rdma_cookie != 0)
>               return -EINVAL;
>  
> -     return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie);
> +     return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, 
> &rm->m_rdma_mr);
>  }
> Index: build-2.6/net/rds/rds.h
> ===================================================================
> --- build-2.6.orig/net/rds/rds.h
> +++ build-2.6/net/rds/rds.h
> @@ -30,6 +30,7 @@
>   */
>  #define RDS_IB_PORT  18635
>  #define RDS_TCP_PORT 18636
> +#define RDS_IWARP_PORT       18637
>  
>  #ifndef AF_RDS
>  #define AF_RDS          28      /* Reliable Datagram Socket     */
> @@ -60,6 +61,7 @@
>  /* XXX crap, we need to worry about this conflicting too */
>  #define SYSCTL_NET_RDS 9912
>  #define SYSCTL_NET_RDS_IB 100
> +#define SYSCTL_NET_RDS_IWARP 101
>  
>  #ifdef DEBUG
>  #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
> @@ -282,6 +284,7 @@ struct rds_incoming {
>  #define RDS_MSG_RETRANSMITTED        5
>  #define RDS_MSG_MAPPED               6
>  #define RDS_MSG_PAGEVEC              7
> +#define RDS_MSG_FASTREG_POSTED       8
>  
>  struct rds_message {
>       atomic_t                m_refcount;
> @@ -301,6 +304,7 @@ struct rds_message {
>       struct rds_sock         *m_rs;
>       struct rds_rdma_op      *m_rdma_op;
>       rds_rdma_cookie_t       m_rdma_cookie;
> +     struct rds_mr           *m_rdma_mr;
>       unsigned int            m_nents;
>       unsigned int            m_count;
>       struct scatterlist      m_sg[0];
> Index: build-2.6/net/rds/ib_cm.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_cm.c
> +++ build-2.6/net/rds/ib_cm.c
> @@ -142,16 +142,19 @@ static void rds_ib_cm_fill_conn_param(st
>                       struct rds_ib_connect_private *dp,
>                       u32 protocol_version)
>  {
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +
>       memset(conn_param, 0, sizeof(struct rdma_conn_param));
>       /* XXX tune these? */
>       conn_param->responder_resources = 1;
>       conn_param->initiator_depth = 1;
> -     conn_param->retry_count = 7;
> -     conn_param->rnr_retry_count = 7;
>  
> -     if (dp) {
> -             struct rds_ib_connection *ic = conn->c_transport_data;
> +     if (!ic->i_iwarp) {
> +             conn_param->retry_count = 7;
> +             conn_param->rnr_retry_count = 7;
> +     }
>  
> +     if (dp) {
>               memset(dp, 0, sizeof(*dp));
>               dp->dp_saddr = conn->c_laddr;
>               dp->dp_daddr = conn->c_faddr;
> @@ -288,7 +291,7 @@ static int rds_ib_setup_qp(struct rds_co
>        */
>       ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
>       if (ret) {
> -             rdsdebug("ib_req_notify_cq failed: %d\n", ret);
> +             rdsdebug("rdma_create_qp failed: %d\n", ret);
>               goto out;
>       }
>  
> @@ -442,6 +445,12 @@ static int rds_ib_cm_handle_connect(stru
>       ic->i_cm_id = cm_id;
>       cm_id->context = conn;
>  
> +     rds_ibdev = ib_get_client_data(cm_id->device, &rds_ib_client);
> +
> +     /* Remember whether this is IB or iWARP */
> +     ic->i_iwarp = (cm_id->device->node_type == RDMA_NODE_RNIC);
> +     ic->i_fastreg = rds_ibdev->use_fastreg;
> +
>       /* We got halfway through setting up the ib_connection, if we
>        * fail now, we have to take the long route out of this mess. */
>       destroy = 0;
> @@ -462,7 +471,6 @@ static int rds_ib_cm_handle_connect(stru
>       }
>  
>       /* update ib_device with this local ipaddr */
> -     rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
>       ib_update_ipaddr_for_device(rds_ibdev, dp->dp_saddr);
>  
>       return 0;
> @@ -616,6 +624,17 @@ int rds_ib_conn_connect(struct rds_conne
>       src.sin_addr.s_addr = (__force u32)conn->c_laddr;
>       src.sin_port = (__force u16)htons(0);
>  
> +     /* First, bind to the local address and device. */
> +     ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
> +     if (ret) {
> +             rdsdebug("rdma_bind_addr(%u.%u.%u.%u) failed: %d\n",
> +                             NIPQUAD(conn->c_laddr), ret);
> +             goto out;
> +     }
> +
> +     /* Now check the device type and set i_iwarp */
> +     ic->i_iwarp = (ic->i_cm_id->device->node_type == RDMA_NODE_RNIC);
> +
>       dest.sin_family = AF_INET;
>       dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
>       dest.sin_port = (__force u16)htons(RDS_IB_PORT);
> @@ -662,8 +681,9 @@ void rds_ib_conn_shutdown(struct rds_con
>                                  " cm: %p err %d\n", ic->i_cm_id, err);
>               }
>  
> -             /* Always move the QP to error state */
> -             if (ic->i_cm_id->qp) {
> +             /* For IB, we have to move the QP to error state.
> +              * This is not needed for iWARP */
> +             if (ic->i_cm_id->qp && !ic->i_iwarp) {
>                       qp_attr.qp_state = IB_QPS_ERR;
>                       err = ib_modify_qp(ic->i_cm_id->qp, &qp_attr, 
> IB_QP_STATE);
>                       if (err) {
> Index: build-2.6/net/rds/ib_send.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_send.c
> +++ build-2.6/net/rds/ib_send.c
> @@ -165,6 +165,8 @@ void rds_ib_send_clear_ring(struct rds_i
>                       rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
>               if (send->s_op)
>                       rds_ib_send_unmap_rdma(ic, send->s_op);
> +             if (send->s_fastreg)
> +                     rds_ib_fastreg_release(send->s_fastreg);
>       }
>  }
>  
> @@ -195,7 +197,7 @@ void rds_ib_send_cq_comp_handler(struct 
>       while (ib_poll_cq(cq, 1, &wc) > 0 ) {
>               rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
>                        (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> -                      be32_to_cpu(wc.imm_data));
> +                      be32_to_cpu(wc.ex.imm_data));
>               rds_ib_stats_inc(s_ib_tx_cq_event);
>  
>               if (wc.wr_id == RDS_IB_ACK_WR_ID) {
> @@ -223,6 +225,16 @@ void rds_ib_send_cq_comp_handler(struct 
>                               /* Nothing to be done - the SG list will be 
> unmapped
>                                * when the SEND completes. */
>                               break;
> +                     case IB_WR_LOCAL_INV:
> +                             /* We invalidated an r_key. the caller may want 
> to
> +                              * learn about this. */
> +                             if (send->s_fastreg)
> +                                     
> rds_ib_local_inv_complete(send->s_fastreg, wc.status);
> +                             break;
> +                     case IB_WR_FAST_REG_MR:
> +                             if (send->s_fastreg)
> +                                     
> rds_ib_fast_reg_complete(send->s_fastreg, wc.status);
> +                             break;
>                       default:
>                               if (printk_ratelimit())
>                                       printk(KERN_NOTICE
> @@ -261,7 +273,7 @@ void rds_ib_send_cq_comp_handler(struct 
>                        * queue_delay_work will not do anything if the work
>                        * struct is already queued, so we need to cancel it 
> first.
>                        */
> -                     cancel_delayed_work(&conn->c_send_w);
> +                     cancel_delayed_work(&conn->c_send_w); /* FIXME barf */
>                       queue_delayed_work(rds_wq, &conn->c_send_w, 0);
>               }
>  
> @@ -490,6 +502,21 @@ int rds_ib_xmit(struct rds_connection *c
>       else
>               i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
>  
> +     /* Fastreg support */
> +     if (rds_rdma_cookie_key(rm->m_rdma_cookie)
> +      && ic->i_fastreg
> +      && !test_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags)) {
> +             ret = rds_ib_xmit_fastreg(conn, rm->m_rdma_mr);
> +             if (ret)
> +                     goto out;
> +
> +             /* We don't release the fastreg yet - we can only
> +              * do that when it has completed. If the connection
> +              * goes down, and we re-queue the message, we would
> +              * have to retry the registration. */
> +             set_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
> +     }
> +
>       work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos);
>       if (work_alloc == 0) {
>               set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
> @@ -849,6 +876,108 @@ out:
>       return ret;
>  }
>  
> +static int __rds_ib_xmit_fastreg(struct rds_connection *conn,
> +                              struct rds_ib_fastreg *frr)
> +{
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +     struct rds_ib_send_work *send = NULL;
> +     struct rds_ib_send_work *first;
> +     struct ib_send_wr *failed_wr;
> +     u32 pos;
> +     u32 work_alloc = 0;
> +     int ret;
> +     int num_wrs;
> +
> +     /*
> +      * Perform 2 WRs for the fast_reg_mr's and chain them together.  The
> +      * first WR is used to invalidate the old rkey, and the second WR is
> +      * used to define the new fast_reg_mr request.  Each individual page
> +      * in the sg list is added to the fast reg page list and placed
> +      * inside the fast_reg_mr WR.  The key used is a rolling 8bit
> +      * counter, which should guarantee uniqueness.
> +      */
> +     num_wrs = 0;
> +     if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY)
> +             num_wrs++;
> +     if (frr->f_page_list)
> +             num_wrs++;
> +     if (!num_wrs)
> +             return 0;
> +
> +     work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, num_wrs, &pos);
> +     if (work_alloc != num_wrs) {
> +             rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
> +             rds_ib_stats_inc(s_ib_tx_ring_full);
> +             ret = -ENOMEM;
> +             goto out;
> +     }
> +
> +     first = send = &ic->i_sends[pos];
> +
> +     if (frr->f_old_rkey != RDS_IB_INVALID_FASTREG_KEY) {
> +             memset(send, 0, sizeof(*send));
> +             send->s_wr.opcode = IB_WR_LOCAL_INV;
> +             send->s_wr.ex.invalidate_rkey = frr->f_old_rkey;
> +             send->s_fastreg = frr;
> +             send->s_queued = jiffies;
> +
> +             /* Get the next WR */
> +             pos = (pos + 1) % ic->i_send_ring.w_nr;
> +             send = &ic->i_sends[pos];
> +     }
> +
> +     if (frr->f_page_list) {
> +             memset(send, 0, sizeof(*send));
> +             send->s_wr.opcode = IB_WR_FAST_REG_MR;
> +             send->s_wr.wr.fast_reg.length = frr->f_length;
> +             send->s_wr.wr.fast_reg.rkey = frr->f_rkey;
> +             send->s_wr.wr.fast_reg.page_list = frr->f_page_list;
> +             send->s_wr.wr.fast_reg.page_list_len = frr->f_page_list_len;
> +             send->s_wr.wr.fast_reg.page_shift = frr->f_page_shift;
> +             send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
> +                                     IB_ACCESS_REMOTE_READ |
> +                                     IB_ACCESS_REMOTE_WRITE;
> +             send->s_fastreg = frr;
> +             send->s_queued = jiffies;
> +     }
> +
> +     atomic_add(num_wrs, &frr->f_refcnt);
> +
> +     /* Chain the two WRs together */
> +     if (num_wrs == 2)
> +             first->s_wr.next = &send->s_wr;
> +
> +     failed_wr = &first->s_wr;
> +     ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
> +
> +     rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
> +              first, &first->s_wr, ret, failed_wr);
> +     BUG_ON(failed_wr != &first->s_wr);
> +     if (ret) {
> +             printk(KERN_WARNING "RDS/IB: fastreg ib_post_send to 
> %u.%u.%u.%u "
> +                    "returned %d\n", NIPQUAD(conn->c_faddr), ret);
> +             while (num_wrs--)
> +                     rds_ib_fastreg_release(frr);
> +             rds_ib_ring_unalloc(&ic->i_send_ring, 2);
> +             return ret;
> +     }
> +
> +out:
> +     return ret;
> +}
> +
> +int rds_ib_xmit_fastreg(struct rds_connection *conn, struct rds_mr *mr)
> +{
> +     struct rds_ib_fastreg *frr;
> +
> +     frr = rds_ib_rdma_get_fastreg(mr);
> +     if (!frr)
> +             return 0;
> +     if (IS_ERR(frr))
> +             return PTR_ERR(frr);
> +     return __rds_ib_xmit_fastreg(conn, frr);
> +}
> +
>  void rds_ib_xmit_complete(struct rds_connection *conn)
>  {
>       struct rds_ib_connection *ic = conn->c_transport_data;
> Index: build-2.6/net/rds/send.c
> ===================================================================
> --- build-2.6.orig/net/rds/send.c
> +++ build-2.6/net/rds/send.c
> @@ -84,6 +84,10 @@ void rds_send_reset(struct rds_connectio
>       list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
>               set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
>               set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
> +             /* If we were in the process of performing a fastreg
> +              * memory registration when the connection went down,
> +              * we have to retry it. */
> +             clear_bit(RDS_MSG_FASTREG_POSTED, &rm->m_flags);
>       }
>       list_splice_init(&conn->c_retrans, &conn->c_send_queue);
>       spin_unlock_irqrestore(&conn->c_lock, flags);
> @@ -765,6 +769,9 @@ static int rds_cmsg_send(struct rds_sock
>               if (cmsg->cmsg_level != SOL_RDS)
>                       continue;
>  
> +             /* As a side effect, RDMA_DEST and RDMA_MAP will set
> +              * rm->m_rdma_cookie and rm->m_rdma_mr.
> +              */
>               switch (cmsg->cmsg_type) {
>               case RDS_CMSG_RDMA_ARGS:
>                       ret = rds_cmsg_rdma_args(rs, rm, cmsg);
> Index: build-2.6/net/rds/message.c
> ===================================================================
> --- build-2.6.orig/net/rds/message.c
> +++ build-2.6/net/rds/message.c
> @@ -71,6 +71,8 @@ static void rds_message_purge(struct rds
>  
>       if (rm->m_rdma_op)
>               rds_rdma_free_op(rm->m_rdma_op);
> +     if (rm->m_rdma_mr)
> +             rds_mr_put(rm->m_rdma_mr);
>  }
>  
>  void rds_message_inc_purge(struct rds_incoming *inc)
> Index: build-2.6/net/rds/rdma.h
> ===================================================================
> --- build-2.6.orig/net/rds/rdma.h
> +++ build-2.6/net/rds/rdma.h
> @@ -74,4 +74,11 @@ int rds_cmsg_rdma_map(struct rds_sock *r
>  void rds_rdma_free_op(struct rds_rdma_op *ro);
>  void rds_rdma_send_complete(struct rds_message *rm, int);
>  
> +extern void __rds_put_mr_final(struct rds_mr *mr);
> +static inline void rds_mr_put(struct rds_mr *mr)
> +{
> +     if (atomic_dec_and_test(&mr->r_refcount))
> +             __rds_put_mr_final(mr);
> +}
> +
>  #endif
> Index: build-2.6/net/rds/ib_recv.c
> ===================================================================
> --- build-2.6.orig/net/rds/ib_recv.c
> +++ build-2.6/net/rds/ib_recv.c
> @@ -796,7 +796,7 @@ void rds_ib_recv_cq_comp_handler(struct 
>       while (ib_poll_cq(cq, 1, &wc) > 0) {
>               rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
>                        (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
> -                      be32_to_cpu(wc.imm_data));
> +                      be32_to_cpu(wc.ex.imm_data));
>               rds_ib_stats_inc(s_ib_rx_cq_event);
>  
>               recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];

_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to