On Mon, May 19, 2008 at 10:05:59AM +0200, Olaf Kirch wrote:
> 
> > However, I'm still seeing performance degradation of ~5% with some packet
> > sizes. And that is *just* the overhead from exchanging the credit 
> > information
> > and checking it - at some point we need to take a spinlock, and that seems
> > to delay things just enough to make a dent in my throughput graph.
> 
> Here's an updated version of the flow control patch - which is now completely
> lockless, and uses a single atomic_t to hold both credit counters. This has
> given me back close to full performance in my testing (throughput seems to be
> down less than 1%, which is almost within the noise range).
> 
> I'll push it to my git tree a little later today, so folks can test it if
> they like.

Works well on my setup.

With proper flow control, there should no longer be a need for rnr_retry (as 
there
should always be a posted recv buffer waiting for the incoming data).  I did a
quick test and removed it and everything seemed to be happy on my rds-stress 
run.

Thanks for pulling this in.
Jon

> 
> Olaf
> -- 
> Olaf Kirch  |  --- o --- Nous sommes du soleil we love when we play
> [EMAIL PROTECTED] |    / | \   sol.dhoop.naytheet.ah kin.ir.samse.qurax
> ----
> From: Olaf Kirch <[EMAIL PROTECTED]>
> Subject: RDS: Implement IB flow control
> 
> Here it is - flow control for RDS/IB.
> 
> This patch is still very much experimental. Here's the essentials
> 
>  -    The approach chosen here uses a credit-based flow control
>       mechanism. Every SEND WR (including ACKs) consumes one credit,
>       and if the sender runs out of credits, it stalls.
> 
>  -    As new receive buffers are posted, credits are transferred to the
>       remote node (using yet another RDS header byte for this).
> 
>  -    Flow control is negotiated during connection setup. Initial credits
>       are exchanged in the rds_ib_connect_private sruct - sending a value
>       of zero (which is also the default for older protocol versions)
>       means no flow control.
> 
>  -    We avoid deadlock (both nodes depleting their credits, and being
>       unable to inform the peer of newly posted buffers) by requiring
>       that the last credit can only be used if we're posting new credits
>       to the peer.
> 
> The approach implemented here is lock-free; preliminary tests show
> the impact on throughput to be less than 1%, and the impact on RTT,
> CPU, TX delay and other metrics to be below the noise threshold.
> 
> Flow control is configurable via sysctl. It only affects newly created
> connections however - so your best bet is to set this right after loading
> the RDS module.
> 
> Signed-off-by: Olaf Kirch <[EMAIL PROTECTED]>
> ---
>  net/rds/ib.c        |    1 
>  net/rds/ib.h        |   30 ++++++++
>  net/rds/ib_cm.c     |   49 ++++++++++++-
>  net/rds/ib_recv.c   |   48 +++++++++---
>  net/rds/ib_send.c   |  194 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  net/rds/ib_stats.c  |    3 
>  net/rds/ib_sysctl.c |   10 ++
>  net/rds/rds.h       |    4 -
>  8 files changed, 325 insertions(+), 14 deletions(-)
> 
> Index: ofa_kernel-1.3/net/rds/ib.h
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib.h
> +++ ofa_kernel-1.3/net/rds/ib.h
> @@ -46,6 +46,7 @@ struct rds_ib_connect_private {
>       __be16                  dp_protocol_minor_mask; /* bitmask */
>       __be32                  dp_reserved1;
>       __be64                  dp_ack_seq;
> +     __be32                  dp_credit;              /* non-zero enables 
> flow ctl */
>  };
>  
>  struct rds_ib_send_work {
> @@ -110,15 +111,32 @@ struct rds_ib_connection {
>       struct ib_sge           i_ack_sge;
>       u64                     i_ack_dma;
>       unsigned long           i_ack_queued;
> +
> +     /* Flow control related information
> +      *
> +      * Our algorithm uses a pair variables that we need to access
> +      * atomically - one for the send credits, and one posted
> +      * recv credits we need to transfer to remote.
> +      * Rather than protect them using a slow spinlock, we put both into
> +      * a single atomic_t and update it using cmpxchg
> +      */
> +     atomic_t                i_credits;
>   
>       /* Protocol version specific information */
>       unsigned int            i_hdr_idx;      /* 1 (old) or 0 (3.1 or later) 
> */
> +     unsigned int            i_flowctl : 1;  /* enable/disable flow ctl */
>  
>       /* Batched completions */
>       unsigned int            i_unsignaled_wrs;
>       long                    i_unsignaled_bytes;
>  };
>  
> +/* This assumes that atomic_t is at least 32 bits */
> +#define IB_GET_SEND_CREDITS(v)       ((v) & 0xffff)
> +#define IB_GET_POST_CREDITS(v)       ((v) >> 16)
> +#define IB_SET_SEND_CREDITS(v)       ((v) & 0xffff)
> +#define IB_SET_POST_CREDITS(v)       ((v) << 16)
> +
>  struct rds_ib_ipaddr {
>       struct list_head        list;
>       __be32                  ipaddr;
> @@ -153,14 +171,17 @@ struct rds_ib_statistics {
>       unsigned long   s_ib_tx_cq_call;
>       unsigned long   s_ib_tx_cq_event;
>       unsigned long   s_ib_tx_ring_full;
> +     unsigned long   s_ib_tx_throttle;
>       unsigned long   s_ib_tx_sg_mapping_failure;
>       unsigned long   s_ib_tx_stalled;
> +     unsigned long   s_ib_tx_credit_updates;
>       unsigned long   s_ib_rx_cq_call;
>       unsigned long   s_ib_rx_cq_event;
>       unsigned long   s_ib_rx_ring_empty;
>       unsigned long   s_ib_rx_refill_from_cq;
>       unsigned long   s_ib_rx_refill_from_thread;
>       unsigned long   s_ib_rx_alloc_limit;
> +     unsigned long   s_ib_rx_credit_updates;
>       unsigned long   s_ib_ack_sent;
>       unsigned long   s_ib_ack_send_failure;
>       unsigned long   s_ib_ack_send_delayed;
> @@ -244,6 +265,8 @@ void rds_ib_flush_mrs(void);
>  int __init rds_ib_recv_init(void);
>  void rds_ib_recv_exit(void);
>  int rds_ib_recv(struct rds_connection *conn);
> +int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
> +                    gfp_t page_gfp, int prefill);
>  void rds_ib_inc_purge(struct rds_incoming *inc);
>  void rds_ib_inc_free(struct rds_incoming *inc);
>  int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
> @@ -252,6 +275,7 @@ void rds_ib_recv_cq_comp_handler(struct 
>  void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
>  void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
>  void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
> +void rds_ib_attempt_ack(struct rds_ib_connection *ic);
>  void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
>  u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
>  
> @@ -266,12 +290,17 @@ u32 rds_ib_ring_completed(struct rds_ib_
>  extern wait_queue_head_t rds_ib_ring_empty_wait;
>  
>  /* ib_send.c */
> +void rds_ib_xmit_complete(struct rds_connection *conn);
>  int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
>               unsigned int hdr_off, unsigned int sg, unsigned int off);
>  void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
>  void rds_ib_send_init_ring(struct rds_ib_connection *ic);
>  void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
>  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int 
> credits);
> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int 
> posted);
> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
> +                          u32 *adv_credits);
>  
>  /* ib_stats.c */
>  RDS_DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
> @@ -287,6 +316,7 @@ extern unsigned long rds_ib_sysctl_max_r
>  extern unsigned long rds_ib_sysctl_max_unsig_wrs;
>  extern unsigned long rds_ib_sysctl_max_unsig_bytes;
>  extern unsigned long rds_ib_sysctl_max_recv_allocation;
> +extern unsigned int rds_ib_sysctl_flow_control;
>  extern ctl_table rds_ib_sysctl_table[];
>  
>  /*
> Index: ofa_kernel-1.3/net/rds/ib_cm.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib_cm.c
> +++ ofa_kernel-1.3/net/rds/ib_cm.c
> @@ -55,6 +55,22 @@ static void rds_ib_set_protocol(struct r
>  }
>  
>  /*
> + * Set up flow control
> + */
> +static void rds_ib_set_flow_control(struct rds_connection *conn, u32 credits)
> +{
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +
> +     if (rds_ib_sysctl_flow_control && credits != 0) {
> +             /* We're doing flow control */
> +             ic->i_flowctl = 1;
> +             rds_ib_send_add_credits(conn, credits);
> +     } else {
> +             ic->i_flowctl = 0;
> +     }
> +}
> +
> +/*
>   * Connection established.
>   * We get here for both outgoing and incoming connection.
>   */
> @@ -72,12 +88,16 @@ static void rds_ib_connect_complete(stru
>               rds_ib_set_protocol(conn,
>                               RDS_PROTOCOL(dp->dp_protocol_major,
>                                       dp->dp_protocol_minor));
> +             rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>       }
>  
> -     rdsdebug("RDS/IB: ib conn complete on %u.%u.%u.%u version %u.%u\n",
> +     printk(KERN_NOTICE "RDS/IB: connected to %u.%u.%u.%u version %u.%u%s\n",
>                       NIPQUAD(conn->c_laddr),
>                       RDS_PROTOCOL_MAJOR(conn->c_version),
> -                     RDS_PROTOCOL_MINOR(conn->c_version));
> +                     RDS_PROTOCOL_MINOR(conn->c_version),
> +                     ic->i_flowctl? ", flow control" : "");
> +
> +     rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
>  
>       /* Tune the RNR timeout. We use a rather low timeout, but
>        * not the absolute minimum - this should be tunable.
> @@ -129,6 +149,24 @@ static void rds_ib_cm_fill_conn_param(st
>               dp->dp_protocol_minor_mask = 
> cpu_to_be16(RDS_IB_SUPPORTED_PROTOCOLS);
>               dp->dp_ack_seq = rds_ib_piggyb_ack(ic);
>  
> +             /* Advertise flow control.
> +              *
> +              * Major chicken and egg alert!
> +              * We would like to post receive buffers before we get here (eg.
> +              * in rds_ib_setup_qp), so that we can give the peer an accurate
> +              * credit value.
> +              * Unfortunately we can't post receive buffers until we've 
> finished
> +              * protocol negotiation, and know in which order data and 
> payload
> +              * are arranged.
> +              *
> +              * What we do here is we give the peer a small initial credit, 
> and
> +              * initialize the number of posted buffers to a negative value.
> +              */
> +             if (ic->i_flowctl) {
> +                     atomic_set(&ic->i_credits, IB_SET_POST_CREDITS(-4));
> +                     dp->dp_credit = cpu_to_be32(4);
> +             }
> +
>               conn_param->private_data = dp;
>               conn_param->private_data_len = sizeof(*dp);
>       }
> @@ -363,6 +401,7 @@ static int rds_ib_cm_handle_connect(stru
>       ic = conn->c_transport_data;
>  
>       rds_ib_set_protocol(conn, version);
> +     rds_ib_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
>  
>       /* If the peer gave us the last packet it saw, process this as if
>        * we had received a regular ACK. */
> @@ -428,6 +467,7 @@ out:
>  static int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
>  {
>       struct rds_connection *conn = cm_id->context;
> +     struct rds_ib_connection *ic = conn->c_transport_data;
>       struct rdma_conn_param conn_param;
>       struct rds_ib_connect_private dp;
>       int ret;
> @@ -435,6 +475,7 @@ static int rds_ib_cm_initiate_connect(st
>       /* If the peer doesn't do protocol negotiation, we must
>        * default to RDSv3.0 */
>       rds_ib_set_protocol(conn, RDS_PROTOCOL_3_0);
> +     ic->i_flowctl = rds_ib_sysctl_flow_control;     /* advertise flow 
> control */
>  
>       ret = rds_ib_setup_qp(conn);
>       if (ret) {
> @@ -688,6 +729,10 @@ void rds_ib_conn_shutdown(struct rds_con
>  #endif
>       ic->i_ack_recv = 0;
>  
> +     /* Clear flow control state */
> +     ic->i_flowctl = 0;
> +     atomic_set(&ic->i_credits, 0);
> +
>       if (ic->i_ibinc) {
>               rds_inc_put(&ic->i_ibinc->ii_inc);
>               ic->i_ibinc = NULL;
> Index: ofa_kernel-1.3/net/rds/ib_recv.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib_recv.c
> +++ ofa_kernel-1.3/net/rds/ib_recv.c
> @@ -220,16 +220,17 @@ out:
>   * -1 is returned if posting fails due to temporary resource exhaustion.
>   */
>  int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
> -                    gfp_t page_gfp)
> +                    gfp_t page_gfp, int prefill)
>  {
>       struct rds_ib_connection *ic = conn->c_transport_data;
>       struct rds_ib_recv_work *recv;
>       struct ib_recv_wr *failed_wr;
> +     unsigned int posted = 0;
>       int ret = 0;
>       u32 pos;
>  
> -     while (rds_conn_up(conn) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, 
> &pos)) {
> -
> +     while ((prefill || rds_conn_up(conn))
> +                     && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
>               if (pos >= ic->i_recv_ring.w_nr) {
>                       printk(KERN_NOTICE "Argh - ring alloc returned 
> pos=%u\n",
>                                       pos);
> @@ -257,8 +258,14 @@ int rds_ib_recv_refill(struct rds_connec
>                       ret = -1;
>                       break;
>               }
> +
> +             posted++;
>       }
>  
> +     /* We're doing flow control - update the window. */
> +     if (ic->i_flowctl && posted)
> +             rds_ib_advertise_credits(conn, posted);
> +
>       if (ret)
>               rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
>       return ret;
> @@ -436,7 +443,7 @@ static u64 rds_ib_get_ack(struct rds_ib_
>  #endif
>  
>  
> -static void rds_ib_send_ack(struct rds_ib_connection *ic)
> +static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int 
> adv_credits)
>  {
>       struct rds_header *hdr = ic->i_ack;
>       struct ib_send_wr *failed_wr;
> @@ -448,6 +455,7 @@ static void rds_ib_send_ack(struct rds_i
>       rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
>       rds_message_populate_header(hdr, 0, 0, 0);
>       hdr->h_ack = cpu_to_be64(seq);
> +     hdr->h_credit = adv_credits;
>       rds_message_make_checksum(hdr);
>       ic->i_ack_queued = jiffies;
>  
> @@ -460,6 +468,8 @@ static void rds_ib_send_ack(struct rds_i
>               set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
>  
>               rds_ib_stats_inc(s_ib_ack_send_failure);
> +             /* Need to finesse this later. */
> +             BUG();
>       } else
>               rds_ib_stats_inc(s_ib_ack_sent);
>  }
> @@ -502,15 +512,27 @@ static void rds_ib_send_ack(struct rds_i
>   * When we get here, we're called from the recv queue handler.
>   * Check whether we ought to transmit an ACK.
>   */
> -static void rds_ib_attempt_ack(struct rds_ib_connection *ic)
> +void rds_ib_attempt_ack(struct rds_ib_connection *ic)
>  {
> +     unsigned int adv_credits;
> +
>       if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
>               return;
> -     if (!test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
> -             clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
> -             rds_ib_send_ack(ic);
> -     } else
> +
> +     if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
>               rds_ib_stats_inc(s_ib_ack_send_delayed);
> +             return;
> +     }
> +
> +     /* Can we get a send credit? */
> +     if (!rds_ib_send_grab_credits(ic, 1, &adv_credits)) {
> +             rds_ib_stats_inc(s_ib_tx_throttle);
> +             clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
> +             return;
> +     }
> +
> +     clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
> +     rds_ib_send_ack(ic, adv_credits);
>  }
>  
>  /*
> @@ -706,6 +728,10 @@ void rds_ib_process_recv(struct rds_conn
>       state->ack_recv = be64_to_cpu(ihdr->h_ack);
>       state->ack_recv_valid = 1;
>  
> +     /* Process the credits update if there was one */
> +     if (ihdr->h_credit)
> +             rds_ib_send_add_credits(conn, ihdr->h_credit);
> +
>       if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
>               /* This is an ACK-only packet. The fact that it gets
>                * special treatment here is that historically, ACKs
> @@ -877,7 +903,7 @@ void rds_ib_recv_cq_comp_handler(struct 
>  
>       if (mutex_trylock(&ic->i_recv_mutex)) {
>               if (rds_ib_recv_refill(conn, GFP_ATOMIC,
> -                                      GFP_ATOMIC | __GFP_HIGHMEM))
> +                                      GFP_ATOMIC | __GFP_HIGHMEM, 0))
>                       ret = -EAGAIN;
>               else
>                       rds_ib_stats_inc(s_ib_rx_refill_from_cq);
> @@ -901,7 +927,7 @@ int rds_ib_recv(struct rds_connection *c
>        * we're really low and we want the caller to back off for a bit.
>        */
>       mutex_lock(&ic->i_recv_mutex);
> -     if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER))
> +     if (rds_ib_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
>               ret = -ENOMEM;
>       else
>               rds_ib_stats_inc(s_ib_rx_refill_from_thread);
> Index: ofa_kernel-1.3/net/rds/ib.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib.c
> +++ ofa_kernel-1.3/net/rds/ib.c
> @@ -187,6 +187,7 @@ static void rds_ib_exit(void)
>  
>  struct rds_transport rds_ib_transport = {
>       .laddr_check            = rds_ib_laddr_check,
> +     .xmit_complete          = rds_ib_xmit_complete,
>       .xmit                   = rds_ib_xmit,
>       .xmit_cong_map          = NULL,
>       .xmit_rdma              = rds_ib_xmit_rdma,
> Index: ofa_kernel-1.3/net/rds/ib_send.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib_send.c
> +++ ofa_kernel-1.3/net/rds/ib_send.c
> @@ -245,6 +245,144 @@ void rds_ib_send_cq_comp_handler(struct 
>       }
>  }
>  
> +/*
> + * This is the main function for allocating credits when sending
> + * messages.
> + *
> + * Conceptually, we have two counters:
> + *  -        send credits: this tells us how many WRs we're allowed
> + *   to submit without overruning the reciever's queue. For
> + *   each SEND WR we post, we decrement this by one.
> + *
> + *  -        posted credits: this tells us how many WRs we recently
> + *   posted to the receive queue. This value is transferred
> + *   to the peer as a "credit update" in a RDS header field.
> + *   Every time we transmit credits to the peer, we subtract
> + *   the amount of transferred credits from this counter.
> + *
> + * It is essential that we avoid situations where both sides have
> + * exhausted their send credits, and are unable to send new credits
> + * to the peer. We achieve this by requiring that we send at least
> + * one credit update to the peer before exhausting our credits.
> + * When new credits arrive, we subtract one credit that is withheld
> + * until we've posted new buffers and are ready to transmit these
> + * credits (see rds_ib_send_add_credits below).
> + *
> + * The RDS send code is essentially single-threaded; rds_send_xmit
> + * grabs c_send_sem to ensure exclusive access to the send ring.
> + * However, the ACK sending code is independent and can race with
> + * message SENDs.
> + *
> + * In the send path, we need to update the counters for send credits
> + * and the counter of posted buffers atomically - when we use the
> + * last available credit, we cannot allow another thread to race us
> + * and grab the posted credits counter.  Hence, we have to use a
> + * spinlock to protect the credit counter, or use atomics.
> + *
> + * Spinlocks shared between the send and the receive path are bad,
> + * because they create unnecessary delays. An early implementation
> + * using a spinlock showed a 5% degradation in throughput at some
> + * loads.
> + *
> + * This implementation avoids spinlocks completely, putting both
> + * counters into a single atomic, and updating that atomic using
> + * atomic_add (in the receive path, when receiving fresh credits),
> + * and using atomic_cmpxchg when updating the two counters.
> + */
> +int rds_ib_send_grab_credits(struct rds_ib_connection *ic,
> +                          u32 wanted, u32 *adv_credits)
> +{
> +     unsigned int avail, posted, got = 0, advertise;
> +     long oldval, newval;
> +
> +     *adv_credits = 0;
> +     if (!ic->i_flowctl)
> +             return wanted;
> +
> +try_again:
> +     advertise = 0;
> +     oldval = newval = atomic_read(&ic->i_credits);
> +     posted = IB_GET_POST_CREDITS(oldval);
> +     avail = IB_GET_SEND_CREDITS(oldval);
> +
> +     rdsdebug("rds_ib_send_grab_credits(%u): credits=%u posted=%u\n",
> +                     wanted, avail, posted);
> +
> +     /* The last credit must be used to send a credit updated. */
> +     if (avail && !posted)
> +             avail--;
> +
> +     if (avail < wanted) {
> +             struct rds_connection *conn = ic->i_cm_id->context;
> +
> +             /* Oops, there aren't that many credits left! */
> +             set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
> +             got = avail;
> +     } else {
> +             /* Sometimes you get what you want, lalala. */
> +             got = wanted;
> +     }
> +     newval -= IB_SET_SEND_CREDITS(got);
> +
> +     if (got && posted) {
> +             advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
> +             newval -= IB_SET_POST_CREDITS(advertise);
> +     }
> +
> +     /* Finally bill everything */
> +     if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
> +             goto try_again;
> +
> +     *adv_credits = advertise;
> +     return got;
> +}
> +
> +void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int 
> credits)
> +{
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +
> +     if (credits == 0)
> +             return;
> +
> +     rdsdebug("rds_ib_send_add_credits(%u): current=%u%s\n",
> +                     credits,
> +                     IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
> +                     test_bit(RDS_LL_SEND_FULL, &conn->c_flags)? ", 
> ll_send_full" : "");
> +
> +     atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
> +     if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
> +             queue_delayed_work(rds_wq, &conn->c_send_w, 0);
> +
> +     WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
> +
> +     rds_ib_stats_inc(s_ib_rx_credit_updates);
> +}
> +
> +void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int 
> posted)
> +{
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +
> +     if (posted == 0)
> +             return;
> +
> +     atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
> +
> +     /* Decide whether to send an update to the peer now.
> +      * If we would send a credit update for every single buffer we
> +      * post, we would end up with an ACK storm (ACK arrives,
> +      * consumes buffer, we refill the ring, send ACK to remote
> +      * advertising the newly posted buffer... ad inf)
> +      *
> +      * Performance pretty much depends on how often we send
> +      * credit updates - too frequent updates mean lots of ACKs.
> +      * Too infrequent updates, and the peer will run out of
> +      * credits and has to throttle.
> +      * For the time being, 16 seems to be a good compromise.
> +      */
> +     if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
> +             set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
> +}
> +
>  static inline void
>  rds_ib_xmit_populate_wr(struct rds_ib_connection *ic,
>               struct rds_ib_send_work *send, unsigned int pos,
> @@ -307,6 +445,8 @@ int rds_ib_xmit(struct rds_connection *c
>       u32 pos;
>       u32 i;
>       u32 work_alloc;
> +     u32 credit_alloc;
> +     u32 adv_credits = 0;
>       int send_flags = 0;
>       int sent;
>       int ret;
> @@ -314,6 +454,7 @@ int rds_ib_xmit(struct rds_connection *c
>       BUG_ON(off % RDS_FRAG_SIZE);
>       BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
>  
> +     /* FIXME we may overallocate here */
>       if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
>               i = 1;
>       else
> @@ -327,8 +468,29 @@ int rds_ib_xmit(struct rds_connection *c
>               goto out;
>       }
>  
> +     credit_alloc = work_alloc;
> +     if (ic->i_flowctl) {
> +             credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, 
> &adv_credits);
> +             if (credit_alloc < work_alloc) {
> +                     rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - 
> credit_alloc);
> +                     work_alloc = credit_alloc;
> +             }
> +             if (work_alloc == 0) {
> +                     rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
> +                     rds_ib_stats_inc(s_ib_tx_throttle);
> +                     ret = -ENOMEM;
> +                     goto out;
> +             }
> +     }
> +
>       /* map the message the first time we see it */
>       if (ic->i_rm == NULL) {
> +             /*
> +             printk(KERN_NOTICE "rds_ib_xmit prep msg dport=%u flags=0x%x 
> len=%d\n",
> +                             be16_to_cpu(rm->m_inc.i_hdr.h_dport),
> +                             rm->m_inc.i_hdr.h_flags,
> +                             be32_to_cpu(rm->m_inc.i_hdr.h_len));
> +                */
>               if (rm->m_nents) {
>                       rm->m_count = ib_dma_map_sg(dev,
>                                        rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
> @@ -449,6 +611,24 @@ add_header:
>                * have been set up to point to the right header buffer. */
>               memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct 
> rds_header));
>  
> +             if (0) {
> +                     struct rds_header *hdr = &ic->i_send_hdrs[pos];
> +
> +                     printk(KERN_NOTICE "send WR dport=%u flags=0x%x 
> len=%d\n",
> +                             be16_to_cpu(hdr->h_dport),
> +                             hdr->h_flags,
> +                             be32_to_cpu(hdr->h_len));
> +             }
> +             if (adv_credits) {
> +                     struct rds_header *hdr = &ic->i_send_hdrs[pos];
> +
> +                     /* add credit and redo the header checksum */
> +                     hdr->h_credit = adv_credits;
> +                     rds_message_make_checksum(hdr);
> +                     adv_credits = 0;
> +                     rds_ib_stats_inc(s_ib_tx_credit_updates);
> +             }
> +
>               if (prev)
>                       prev->s_wr.next = &send->s_wr;
>               prev = send;
> @@ -472,6 +652,8 @@ add_header:
>               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
>               work_alloc = i;
>       }
> +     if (ic->i_flowctl && i < credit_alloc)
> +             rds_ib_send_add_credits(conn, credit_alloc - i);
>  
>       /* XXX need to worry about failed_wr and partial sends. */
>       failed_wr = &first->s_wr;
> @@ -487,11 +669,14 @@ add_header:
>                       ic->i_rm = prev->s_rm;
>                       prev->s_rm = NULL;
>               }
> +             /* Finesse this later */
> +             BUG();
>               goto out;
>       }
>  
>       ret = sent;
>  out:
> +     BUG_ON(adv_credits);
>       return ret;
>  }
>  
> @@ -630,3 +815,12 @@ int rds_ib_xmit_rdma(struct rds_connecti
>  out:
>       return ret;
>  }
> +
> +void rds_ib_xmit_complete(struct rds_connection *conn)
> +{
> +     struct rds_ib_connection *ic = conn->c_transport_data;
> +
> +     /* We may have a pending ACK or window update we were unable
> +      * to send previously (due to flow control). Try again. */
> +     rds_ib_attempt_ack(ic);
> +}
> Index: ofa_kernel-1.3/net/rds/ib_stats.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib_stats.c
> +++ ofa_kernel-1.3/net/rds/ib_stats.c
> @@ -46,14 +46,17 @@ static char *rds_ib_stat_names[] = {
>       "ib_tx_cq_call",
>       "ib_tx_cq_event",
>       "ib_tx_ring_full",
> +     "ib_tx_throttle",
>       "ib_tx_sg_mapping_failure",
>       "ib_tx_stalled",
> +     "ib_tx_credit_updates",
>       "ib_rx_cq_call",
>       "ib_rx_cq_event",
>       "ib_rx_ring_empty",
>       "ib_rx_refill_from_cq",
>       "ib_rx_refill_from_thread",
>       "ib_rx_alloc_limit",
> +     "ib_rx_credit_updates",
>       "ib_ack_sent",
>       "ib_ack_send_failure",
>       "ib_ack_send_delayed",
> Index: ofa_kernel-1.3/net/rds/rds.h
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/rds.h
> +++ ofa_kernel-1.3/net/rds/rds.h
> @@ -170,6 +170,7 @@ struct rds_connection {
>  #define RDS_FLAG_CONG_BITMAP 0x01
>  #define RDS_FLAG_ACK_REQUIRED        0x02
>  #define RDS_FLAG_RETRANSMITTED       0x04
> +#define RDS_MAX_ADV_CREDIT   255
>  
>  /*
>   * Maximum space available for extension headers.
> @@ -183,7 +184,8 @@ struct rds_header {
>       __be16  h_sport;
>       __be16  h_dport;
>       u8      h_flags;
> -     u8      h_padding[5];
> +     u8      h_credit;
> +     u8      h_padding[4];
>       __sum16 h_csum;
>  
>       u8      h_exthdr[RDS_HEADER_EXT_SPACE];
> Index: ofa_kernel-1.3/net/rds/ib_sysctl.c
> ===================================================================
> --- ofa_kernel-1.3.orig/net/rds/ib_sysctl.c
> +++ ofa_kernel-1.3/net/rds/ib_sysctl.c
> @@ -53,6 +53,8 @@ unsigned long rds_ib_sysctl_max_unsig_by
>  static unsigned long rds_ib_sysctl_max_unsig_bytes_min = 1;
>  static unsigned long rds_ib_sysctl_max_unsig_bytes_max = ~0UL;
>  
> +unsigned int rds_ib_sysctl_flow_control = 1;
> +
>  ctl_table rds_ib_sysctl_table[] = {
>       {
>               .ctl_name       = 1,
> @@ -102,6 +104,14 @@ ctl_table rds_ib_sysctl_table[] = {
>               .mode           = 0644,
>               .proc_handler   = &proc_doulongvec_minmax,
>       },
> +     {
> +             .ctl_name       = 6,
> +             .procname       = "flow_control",
> +             .data           = &rds_ib_sysctl_flow_control,
> +             .maxlen         = sizeof(rds_ib_sysctl_flow_control),
> +             .mode           = 0644,
> +             .proc_handler   = &proc_dointvec,
> +     },
>       { .ctl_name = 0}
>  };
>  
> _______________________________________________
> general mailing list
> [email protected]
> http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general
> 
> To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to