The branch main has been updated by jhb:

URL: 
https://cgit.FreeBSD.org/src/commit/?id=fd8f61d6e970fa443d393d330ae70c54c9a523a4

commit fd8f61d6e970fa443d393d330ae70c54c9a523a4
Author:     John Baldwin <[email protected]>
AuthorDate: 2022-02-08 00:20:06 +0000
Commit:     John Baldwin <[email protected]>
CommitDate: 2022-02-08 00:20:06 +0000

    cxgbei: Dispatch sent PDUs to the NIC asynchronously.
    
    Previously the driver was called to send PDUs to the NIC synchronously
    from the icl_conn_pdu_queue_cb callback.  However, this performed a
    fair bit of work while holding the icl connection lock.  Instead,
    change the callback to add sent PDUs to a STAILQ and defer dispatching
    of PDUs to the NIC to a helper thread similar to the scheme used in
    the TCP iSCSI backend.
    
    - Replace rx_flags int and the sole RXF_ACTIVE flag with a simple
      rx_active bool.
    
    - Add a pool of transmit worker threads for cxgbei.
    
    - Fix worker thread exit to depend on the wakeup in kthread_exit()
      to fix a race with module unload.
    
    Reported by:    mav
    Sponsored by:   Chelsio Communications
---
 sys/dev/cxgbe/cxgbei/cxgbei.c     | 187 +++++++++++++++++++++-----------------
 sys/dev/cxgbe/cxgbei/cxgbei.h     |  21 +++--
 sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 172 +++++++++++++++++++++++++++++------
 3 files changed, 260 insertions(+), 120 deletions(-)

diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index 4a8df99b3d48..c06e39005197 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -95,8 +95,9 @@ __FBSDID("$FreeBSD$");
 #include "cxgbei.h"
 
 static int worker_thread_count;
-static struct cxgbei_worker_thread_softc *cwt_softc;
-static struct proc *cxgbei_proc;
+static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
+
+static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
 
 static void
 read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
@@ -585,17 +586,9 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header 
*rss, struct mbuf *m)
        icl_cxgbei_new_pdu_set_conn(ip, ic);
 
        STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
-       if ((icc->rx_flags & RXF_ACTIVE) == 0) {
-               struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
-               mtx_lock(&cwt->cwt_lock);
-               icc->rx_flags |= RXF_ACTIVE;
-               TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
-               if (cwt->cwt_state == CWT_SLEEPING) {
-                       cwt->cwt_state = CWT_RUNNING;
-                       cv_signal(&cwt->cwt_cv);
-               }
-               mtx_unlock(&cwt->cwt_lock);
+       if (!icc->rx_active) {
+               icc->rx_active = true;
+               cwt_queue_for_rx(icc);
        }
        SOCKBUF_UNLOCK(sb);
        INP_WUNLOCK(inp);
@@ -836,17 +829,9 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header 
*rss, struct mbuf *m)
 
        /* Enqueue the PDU to the received pdus queue. */
        STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
-       if ((icc->rx_flags & RXF_ACTIVE) == 0) {
-               struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
-               mtx_lock(&cwt->cwt_lock);
-               icc->rx_flags |= RXF_ACTIVE;
-               TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
-               if (cwt->cwt_state == CWT_SLEEPING) {
-                       cwt->cwt_state = CWT_RUNNING;
-                       cv_signal(&cwt->cwt_cv);
-               }
-               mtx_unlock(&cwt->cwt_lock);
+       if (!icc->rx_active) {
+               icc->rx_active = true;
+               cwt_queue_for_rx(icc);
        }
        SOCKBUF_UNLOCK(sb);
        INP_WUNLOCK(inp);
@@ -944,9 +929,9 @@ static struct uld_info cxgbei_uld_info = {
 };
 
 static void
-cwt_main(void *arg)
+cwt_rx_main(void *arg)
 {
-       struct cxgbei_worker_thread_softc *cwt = arg;
+       struct cxgbei_worker_thread *cwt = arg;
        struct icl_cxgbei_conn *icc = NULL;
        struct icl_conn *ic;
        struct icl_pdu *ip;
@@ -962,8 +947,8 @@ cwt_main(void *arg)
 
        while (__predict_true(cwt->cwt_state != CWT_STOP)) {
                cwt->cwt_state = CWT_RUNNING;
-               while ((icc = TAILQ_FIRST(&cwt->rx_head)) != NULL) {
-                       TAILQ_REMOVE(&cwt->rx_head, icc, rx_link);
+               while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+                       TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
                        mtx_unlock(&cwt->cwt_lock);
 
                        ic = &icc->ic;
@@ -979,7 +964,7 @@ cwt_main(void *arg)
                                 */
                                parse_pdus(icc, sb);
                        }
-                       MPASS(icc->rx_flags & RXF_ACTIVE);
+                       MPASS(icc->rx_active);
                        if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
                                MPASS(STAILQ_EMPTY(&rx_pdus));
                                STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
@@ -994,11 +979,16 @@ cwt_main(void *arg)
                                SOCKBUF_LOCK(sb);
                                MPASS(STAILQ_EMPTY(&rx_pdus));
                        }
-                       MPASS(icc->rx_flags & RXF_ACTIVE);
+                       MPASS(icc->rx_active);
                        if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
                            __predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
-                               icc->rx_flags &= ~RXF_ACTIVE;
+                               icc->rx_active = false;
+                               SOCKBUF_UNLOCK(sb);
+
+                               mtx_lock(&cwt->cwt_lock);
                        } else {
+                               SOCKBUF_UNLOCK(sb);
+
                                /*
                                 * More PDUs were received while we were busy
                                 * handing over the previous batch to ICL.
@@ -1006,13 +996,9 @@ cwt_main(void *arg)
                                 * queue.
                                 */
                                mtx_lock(&cwt->cwt_lock);
-                               TAILQ_INSERT_TAIL(&cwt->rx_head, icc,
+                               TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
                                    rx_link);
-                               mtx_unlock(&cwt->cwt_lock);
                        }
-                       SOCKBUF_UNLOCK(sb);
-
-                       mtx_lock(&cwt->cwt_lock);
                }
 
                /* Inner loop doesn't check for CWT_STOP, do that first. */
@@ -1022,84 +1008,121 @@ cwt_main(void *arg)
                cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
        }
 
-       MPASS(TAILQ_FIRST(&cwt->rx_head) == NULL);
-       mtx_assert(&cwt->cwt_lock, MA_OWNED);
-       cwt->cwt_state = CWT_STOPPED;
-       cv_signal(&cwt->cwt_cv);
+       MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
        mtx_unlock(&cwt->cwt_lock);
        kthread_exit();
 }
 
+static void
+cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
+{
+       struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
+
+       mtx_lock(&cwt->cwt_lock);
+       TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
+       if (cwt->cwt_state == CWT_SLEEPING) {
+               cwt->cwt_state = CWT_RUNNING;
+               cv_signal(&cwt->cwt_cv);
+       }
+       mtx_unlock(&cwt->cwt_lock);
+}
+
+void
+cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
+{
+       struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
+
+       mtx_lock(&cwt->cwt_lock);
+       TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
+       if (cwt->cwt_state == CWT_SLEEPING) {
+               cwt->cwt_state = CWT_RUNNING;
+               cv_signal(&cwt->cwt_cv);
+       }
+       mtx_unlock(&cwt->cwt_lock);
+}
+
 static int
 start_worker_threads(void)
 {
+       struct proc *cxgbei_proc;
        int i, rc;
-       struct cxgbei_worker_thread_softc *cwt;
+       struct cxgbei_worker_thread *cwt;
 
        worker_thread_count = min(mp_ncpus, 32);
-       cwt_softc = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+       cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+           M_WAITOK | M_ZERO);
+       cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
            M_WAITOK | M_ZERO);
 
-       MPASS(cxgbei_proc == NULL);
-       for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+       for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+            i++, cwt++) {
+               mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
+               cv_init(&cwt->cwt_cv, "cwt cv");
+               TAILQ_INIT(&cwt->icc_head);
+       }
+
+       for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+            i++, cwt++) {
                mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
                cv_init(&cwt->cwt_cv, "cwt cv");
-               TAILQ_INIT(&cwt->rx_head);
-               rc = kproc_kthread_add(cwt_main, cwt, &cxgbei_proc, NULL, 0, 0,
-                   "cxgbei", "%d", i);
+               TAILQ_INIT(&cwt->icc_head);
+       }
+
+       cxgbei_proc = NULL;
+       for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+            i++, cwt++) {
+               rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
+                   &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
                if (rc != 0) {
-                       printf("cxgbei: failed to start thread #%d/%d (%d)\n",
+                       printf("cxgbei: failed to start rx thread #%d/%d 
(%d)\n",
                            i + 1, worker_thread_count, rc);
-                       mtx_destroy(&cwt->cwt_lock);
-                       cv_destroy(&cwt->cwt_cv);
-                       bzero(cwt, sizeof(*cwt));
-                       if (i == 0) {
-                               free(cwt_softc, M_CXGBE);
-                               worker_thread_count = 0;
-
-                               return (rc);
-                       }
-
-                       /* Not fatal, carry on with fewer threads. */
-                       worker_thread_count = i;
-                       rc = 0;
-                       break;
+                       return (rc);
                }
+       }
 
-               /* Wait for thread to start before moving on to the next one. */
-               mtx_lock(&cwt->cwt_lock);
-               while (cwt->cwt_state == 0)
-                       cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-               mtx_unlock(&cwt->cwt_lock);
+       for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+            i++, cwt++) {
+               rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
+                   &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
+               if (rc != 0) {
+                       printf("cxgbei: failed to start tx thread #%d/%d 
(%d)\n",
+                           i + 1, worker_thread_count, rc);
+                       return (rc);
+               }
        }
 
-       MPASS(cwt_softc != NULL);
-       MPASS(worker_thread_count > 0);
        return (0);
 }
 
 static void
-stop_worker_threads(void)
+stop_worker_threads1(struct cxgbei_worker_thread *threads)
 {
+       struct cxgbei_worker_thread *cwt;
        int i;
-       struct cxgbei_worker_thread_softc *cwt = &cwt_softc[0];
 
-       MPASS(worker_thread_count >= 0);
-
-       for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+       for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
                mtx_lock(&cwt->cwt_lock);
-               MPASS(cwt->cwt_state == CWT_RUNNING ||
-                   cwt->cwt_state == CWT_SLEEPING);
-               cwt->cwt_state = CWT_STOP;
-               cv_signal(&cwt->cwt_cv);
-               do {
-                       cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-               } while (cwt->cwt_state != CWT_STOPPED);
+               if (cwt->cwt_td != NULL) {
+                       MPASS(cwt->cwt_state == CWT_RUNNING ||
+                           cwt->cwt_state == CWT_SLEEPING);
+                       cwt->cwt_state = CWT_STOP;
+                       cv_signal(&cwt->cwt_cv);
+                       mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
+               }
                mtx_unlock(&cwt->cwt_lock);
                mtx_destroy(&cwt->cwt_lock);
                cv_destroy(&cwt->cwt_cv);
        }
-       free(cwt_softc, M_CXGBE);
+       free(threads, M_CXGBE);
+}
+
+static void
+stop_worker_threads(void)
+{
+
+       MPASS(worker_thread_count >= 0);
+       stop_worker_threads1(cwt_rx_threads);
+       stop_worker_threads1(cwt_tx_threads);
 }
 
 /* Select a worker thread for a connection. */
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index 58a5dac6d63b..b078f3110d62 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -36,23 +36,19 @@ enum {
        CWT_SLEEPING    = 1,
        CWT_RUNNING     = 2,
        CWT_STOP        = 3,
-       CWT_STOPPED     = 4,
 };
 
-struct cxgbei_worker_thread_softc {
+struct cxgbei_worker_thread {
        struct mtx      cwt_lock;
        struct cv       cwt_cv;
        volatile int    cwt_state;
+       struct thread   *cwt_td;
 
-       TAILQ_HEAD(, icl_cxgbei_conn) rx_head;
+       TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
 } __aligned(CACHE_LINE_SIZE);
 
 #define CXGBEI_CONN_SIGNATURE 0x56788765
 
-enum {
-       RXF_ACTIVE      = 1 << 0,       /* In the worker thread's queue */
-};
-
 struct cxgbei_cmp {
        LIST_ENTRY(cxgbei_cmp) link;
 
@@ -71,16 +67,21 @@ struct icl_cxgbei_conn {
        int ulp_submode;
        struct adapter *sc;
        struct toepcb *toep;
+       u_int cwt;
 
        /* Receive related. */
-       u_int rx_flags;                         /* protected by so_rcv lock */
-       u_int cwt;
+       bool rx_active;                         /* protected by so_rcv lock */
        STAILQ_HEAD(, icl_pdu) rcvd_pdus;       /* protected by so_rcv lock */
        TAILQ_ENTRY(icl_cxgbei_conn) rx_link;   /* protected by cwt lock */
 
        struct cxgbei_cmp_head *cmp_table;      /* protected by cmp_lock */
        struct mtx cmp_lock;
        unsigned long cmp_hash_mask;
+
+       /* Transmit related. */
+       bool tx_active;                         /* protected by ic lock */
+       STAILQ_HEAD(, icl_pdu) sent_pdus;       /* protected by ic lock */
+       TAILQ_ENTRY(icl_cxgbei_conn) tx_link;   /* protected by cwt lock */
 };
 
 static inline struct icl_cxgbei_conn *
@@ -134,8 +135,10 @@ struct cxgbei_data {
 
 /* cxgbei.c */
 u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
+void cwt_queue_for_tx(struct icl_cxgbei_conn *);
 
 /* icl_cxgbei.c */
+void cwt_tx_main(void *);
 int icl_cxgbei_mod_load(void);
 int icl_cxgbei_mod_unload(void);
 struct icl_pdu *icl_cxgbei_new_pdu(int);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c 
b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index f66a959f6311..516ab931a49c 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -421,6 +421,128 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct 
icl_cxgbei_pdu *icp)
        return (m);
 }
 
+static void
+cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+{
+       struct epoch_tracker et;
+       struct icl_conn *ic = &icc->ic;
+       struct toepcb *toep = icc->toep;
+       struct inpcb *inp;
+
+       /*
+        * Do not get inp from toep->inp as the toepcb might have
+        * detached already.
+        */
+       inp = sotoinpcb(so);
+       CURVNET_SET(toep->vnet);
+       NET_EPOCH_ENTER(et);
+       INP_WLOCK(inp);
+
+       ICL_CONN_UNLOCK(ic);
+       if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
+           __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+               mbufq_drain(mq);
+       } else {
+               mbufq_concat(&toep->ulp_pduq, mq);
+               t4_push_pdus(icc->sc, toep, 0);
+       }
+       INP_WUNLOCK(inp);
+       NET_EPOCH_EXIT(et);
+       CURVNET_RESTORE();
+
+       ICL_CONN_LOCK(ic);
+}
+
+void
+cwt_tx_main(void *arg)
+{
+       struct cxgbei_worker_thread *cwt = arg;
+       struct icl_cxgbei_conn *icc;
+       struct icl_conn *ic;
+       struct icl_pdu *ip;
+       struct socket *so;
+       struct mbuf *m;
+       struct mbufq mq;
+       STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
+
+       MPASS(cwt != NULL);
+
+       mtx_lock(&cwt->cwt_lock);
+       MPASS(cwt->cwt_state == 0);
+       cwt->cwt_state = CWT_RUNNING;
+       cv_signal(&cwt->cwt_cv);
+
+       mbufq_init(&mq, INT_MAX);
+       while (__predict_true(cwt->cwt_state != CWT_STOP)) {
+               cwt->cwt_state = CWT_RUNNING;
+               while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+                       TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
+                       mtx_unlock(&cwt->cwt_lock);
+
+                       ic = &icc->ic;
+
+                       ICL_CONN_LOCK(ic);
+                       MPASS(icc->tx_active);
+                       STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+                       ICL_CONN_UNLOCK(ic);
+
+                       while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+                               STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+
+                               m = finalize_pdu(icc, ip_to_icp(ip));
+                               M_ASSERTPKTHDR(m);
+                               MPASS((m->m_pkthdr.len & 3) == 0);
+
+                               mbufq_enqueue(&mq, m);
+                       }
+
+                       ICL_CONN_LOCK(ic);
+                       so = ic->ic_socket;
+                       if (__predict_false(ic->ic_disconnecting) ||
+                           __predict_false(so == NULL)) {
+                               mbufq_drain(&mq);
+                               icc->tx_active = false;
+                               ICL_CONN_UNLOCK(ic);
+
+                               mtx_lock(&cwt->cwt_lock);
+                               continue;
+                       }
+
+                       cwt_push_pdus(icc, so, &mq);
+
+                       MPASS(icc->tx_active);
+                       if (STAILQ_EMPTY(&icc->sent_pdus)) {
+                               icc->tx_active = false;
+                               ICL_CONN_UNLOCK(ic);
+
+                               mtx_lock(&cwt->cwt_lock);
+                       } else {
+                               ICL_CONN_UNLOCK(ic);
+
+                               /*
+                                * More PDUs were queued while we were
+                                * busy sending the previous batch.
+                                * Re-add this connection to the end
+                                * of the queue.
+                                */
+                               mtx_lock(&cwt->cwt_lock);
+                               TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
+                                   tx_link);
+                       }
+               }
+
+               /* Inner loop doesn't check for CWT_STOP, do that first. */
+               if (__predict_false(cwt->cwt_state == CWT_STOP))
+                       break;
+               cwt->cwt_state = CWT_SLEEPING;
+               cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+       }
+
+       MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
+       mtx_unlock(&cwt->cwt_lock);
+       kthread_exit();
+}
+
 int
 icl_cxgbei_conn_pdu_append_data(struct icl_conn *ic, struct icl_pdu *ip,
     const void *addr, size_t len, int flags)
@@ -534,13 +656,9 @@ void
 icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
                             icl_pdu_cb cb)
 {
-       struct epoch_tracker et;
        struct icl_cxgbei_conn *icc = ic_to_icc(ic);
        struct icl_cxgbei_pdu *icp = ip_to_icp(ip);
        struct socket *so = ic->ic_socket;
-       struct toepcb *toep = icc->toep;
-       struct inpcb *inp;
-       struct mbuf *m;
 
        MPASS(ic == ip->ip_conn);
        MPASS(ip->ip_bhs_mbuf != NULL);
@@ -557,28 +675,11 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct 
icl_pdu *ip,
                return;
        }
 
-       m = finalize_pdu(icc, icp);
-       M_ASSERTPKTHDR(m);
-       MPASS((m->m_pkthdr.len & 3) == 0);
-
-       /*
-        * Do not get inp from toep->inp as the toepcb might have detached
-        * already.
-        */
-       inp = sotoinpcb(so);
-       CURVNET_SET(toep->vnet);
-       NET_EPOCH_ENTER(et);
-       INP_WLOCK(inp);
-       if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
-           __predict_false((toep->flags & TPF_ATTACHED) == 0))
-               m_freem(m);
-       else {
-               mbufq_enqueue(&toep->ulp_pduq, m);
-               t4_push_pdus(icc->sc, toep, 0);
+       STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
+       if (!icc->tx_active) {
+               icc->tx_active = true;
+               cwt_queue_for_tx(icc);
        }
-       INP_WUNLOCK(inp);
-       NET_EPOCH_EXIT(et);
-       CURVNET_RESTORE();
 }
 
 static struct icl_conn *
@@ -593,6 +694,7 @@ icl_cxgbei_new_conn(const char *name, struct mtx *lock)
            M_WAITOK | M_ZERO);
        icc->icc_signature = CXGBEI_CONN_SIGNATURE;
        STAILQ_INIT(&icc->rcvd_pdus);
+       STAILQ_INIT(&icc->sent_pdus);
 
        icc->cmp_table = hashinit(64, M_CXGBEI, &icc->cmp_hash_mask);
        mtx_init(&icc->cmp_lock, "cxgbei_cmp", NULL, MTX_DEF);
@@ -935,21 +1037,33 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
        if (toep != NULL) {     /* NULL if connection was never offloaded. */
                toep->ulpcb = NULL;
 
+               /*
+                * Wait for the cwt threads to stop processing this
+                * connection for transmit.
+                */
+               while (icc->tx_active)
+                       rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
+
                /* Discard PDUs queued for TX. */
+               while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+                       ip = STAILQ_FIRST(&icc->sent_pdus);
+                       STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+                       icl_cxgbei_pdu_done(ip, ENOTCONN);
+               }
                mbufq_drain(&toep->ulp_pduq);
 
                /*
                 * Wait for the cwt threads to stop processing this
-                * connection.
+                * connection for receive.
                 */
                SOCKBUF_LOCK(sb);
-               if (icc->rx_flags & RXF_ACTIVE) {
-                       volatile u_int *p = &icc->rx_flags;
+               if (icc->rx_active) {
+                       volatile bool *p = &icc->rx_active;
 
                        SOCKBUF_UNLOCK(sb);
                        INP_WUNLOCK(inp);
 
-                       while (*p & RXF_ACTIVE)
+                       while (*p)
                                pause("conclo", 1);
 
                        INP_WLOCK(inp);

Reply via email to