The branch main has been updated by jhb:

URL: 
https://cgit.FreeBSD.org/src/commit/?id=511b83b167b02608a1f857fa2e0cc5fb3218e09d

commit 511b83b167b02608a1f857fa2e0cc5fb3218e09d
Author:     John Baldwin <[email protected]>
AuthorDate: 2022-02-08 00:20:40 +0000
Commit:     John Baldwin <[email protected]>
CommitDate: 2022-02-08 00:20:40 +0000

    cxgbei: Replace worker thread pools with per-connection kthreads.
    
    Having a single pool of worker threads adds extra complexity and
    overhead.  The software backend also uses per-connection kthreads.
    
    Sponsored by:   Chelsio Communications
---
 sys/dev/cxgbe/cxgbei/cxgbei.c     | 237 +---------------------------
 sys/dev/cxgbe/cxgbei/cxgbei.h     |  22 +--
 sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 320 ++++++++++++++++++++------------------
 3 files changed, 178 insertions(+), 401 deletions(-)

diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index c06e39005197..979feace81dd 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -94,11 +94,6 @@ __FBSDID("$FreeBSD$");
 #include "tom/t4_tom.h"
 #include "cxgbei.h"
 
-static int worker_thread_count;
-static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
-
-static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
-
 static void
 read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
     uint32_t *max_rx_data_len, struct ppod_region *pr)
@@ -424,7 +419,7 @@ parse_pdu(struct socket *so, struct toepcb *toep, struct 
icl_cxgbei_conn *icc,
        return (ip);
 }
 
-static void
+void
 parse_pdus(struct icl_cxgbei_conn *icc, struct sockbuf *sb)
 {
        struct icl_conn *ic = &icc->ic;
@@ -588,7 +583,7 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header 
*rss, struct mbuf *m)
        STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
        if (!icc->rx_active) {
                icc->rx_active = true;
-               cwt_queue_for_rx(icc);
+               wakeup(&icc->rx_active);
        }
        SOCKBUF_UNLOCK(sb);
        INP_WUNLOCK(inp);
@@ -831,7 +826,7 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header 
*rss, struct mbuf *m)
        STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
        if (!icc->rx_active) {
                icc->rx_active = true;
-               cwt_queue_for_rx(icc);
+               wakeup(&icc->rx_active);
        }
        SOCKBUF_UNLOCK(sb);
        INP_WUNLOCK(inp);
@@ -928,222 +923,6 @@ static struct uld_info cxgbei_uld_info = {
        .deactivate = cxgbei_deactivate,
 };
 
-static void
-cwt_rx_main(void *arg)
-{
-       struct cxgbei_worker_thread *cwt = arg;
-       struct icl_cxgbei_conn *icc = NULL;
-       struct icl_conn *ic;
-       struct icl_pdu *ip;
-       struct sockbuf *sb;
-       STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
-
-       MPASS(cwt != NULL);
-
-       mtx_lock(&cwt->cwt_lock);
-       MPASS(cwt->cwt_state == 0);
-       cwt->cwt_state = CWT_RUNNING;
-       cv_signal(&cwt->cwt_cv);
-
-       while (__predict_true(cwt->cwt_state != CWT_STOP)) {
-               cwt->cwt_state = CWT_RUNNING;
-               while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
-                       TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
-                       mtx_unlock(&cwt->cwt_lock);
-
-                       ic = &icc->ic;
-                       sb = &ic->ic_socket->so_rcv;
-
-                       SOCKBUF_LOCK(sb);
-                       if (__predict_false(sbused(sb)) != 0) {
-                               /*
-                                * PDUs were received before the tid
-                                * transitioned to ULP mode.  Convert
-                                * them to icl_cxgbei_pdus and insert
-                                * them into the head of rcvd_pdus.
-                                */
-                               parse_pdus(icc, sb);
-                       }
-                       MPASS(icc->rx_active);
-                       if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
-                               MPASS(STAILQ_EMPTY(&rx_pdus));
-                               STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
-                               SOCKBUF_UNLOCK(sb);
-
-                               /* Hand over PDUs to ICL. */
-                               while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
-                                       STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
-                                       ic->ic_receive(ip);
-                               }
-
-                               SOCKBUF_LOCK(sb);
-                               MPASS(STAILQ_EMPTY(&rx_pdus));
-                       }
-                       MPASS(icc->rx_active);
-                       if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
-                           __predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
-                               icc->rx_active = false;
-                               SOCKBUF_UNLOCK(sb);
-
-                               mtx_lock(&cwt->cwt_lock);
-                       } else {
-                               SOCKBUF_UNLOCK(sb);
-
-                               /*
-                                * More PDUs were received while we were busy
-                                * handing over the previous batch to ICL.
-                                * Re-add this connection to the end of the
-                                * queue.
-                                */
-                               mtx_lock(&cwt->cwt_lock);
-                               TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
-                                   rx_link);
-                       }
-               }
-
-               /* Inner loop doesn't check for CWT_STOP, do that first. */
-               if (__predict_false(cwt->cwt_state == CWT_STOP))
-                       break;
-               cwt->cwt_state = CWT_SLEEPING;
-               cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-       }
-
-       MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
-       mtx_unlock(&cwt->cwt_lock);
-       kthread_exit();
-}
-
-static void
-cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
-{
-       struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
-
-       mtx_lock(&cwt->cwt_lock);
-       TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
-       if (cwt->cwt_state == CWT_SLEEPING) {
-               cwt->cwt_state = CWT_RUNNING;
-               cv_signal(&cwt->cwt_cv);
-       }
-       mtx_unlock(&cwt->cwt_lock);
-}
-
-void
-cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
-{
-       struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
-
-       mtx_lock(&cwt->cwt_lock);
-       TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
-       if (cwt->cwt_state == CWT_SLEEPING) {
-               cwt->cwt_state = CWT_RUNNING;
-               cv_signal(&cwt->cwt_cv);
-       }
-       mtx_unlock(&cwt->cwt_lock);
-}
-
-static int
-start_worker_threads(void)
-{
-       struct proc *cxgbei_proc;
-       int i, rc;
-       struct cxgbei_worker_thread *cwt;
-
-       worker_thread_count = min(mp_ncpus, 32);
-       cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
-           M_WAITOK | M_ZERO);
-       cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
-           M_WAITOK | M_ZERO);
-
-       for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
-            i++, cwt++) {
-               mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
-               cv_init(&cwt->cwt_cv, "cwt cv");
-               TAILQ_INIT(&cwt->icc_head);
-       }
-
-       for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
-            i++, cwt++) {
-               mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
-               cv_init(&cwt->cwt_cv, "cwt cv");
-               TAILQ_INIT(&cwt->icc_head);
-       }
-
-       cxgbei_proc = NULL;
-       for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
-            i++, cwt++) {
-               rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
-                   &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
-               if (rc != 0) {
-                       printf("cxgbei: failed to start rx thread #%d/%d 
(%d)\n",
-                           i + 1, worker_thread_count, rc);
-                       return (rc);
-               }
-       }
-
-       for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
-            i++, cwt++) {
-               rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
-                   &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
-               if (rc != 0) {
-                       printf("cxgbei: failed to start tx thread #%d/%d 
(%d)\n",
-                           i + 1, worker_thread_count, rc);
-                       return (rc);
-               }
-       }
-
-       return (0);
-}
-
-static void
-stop_worker_threads1(struct cxgbei_worker_thread *threads)
-{
-       struct cxgbei_worker_thread *cwt;
-       int i;
-
-       for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
-               mtx_lock(&cwt->cwt_lock);
-               if (cwt->cwt_td != NULL) {
-                       MPASS(cwt->cwt_state == CWT_RUNNING ||
-                           cwt->cwt_state == CWT_SLEEPING);
-                       cwt->cwt_state = CWT_STOP;
-                       cv_signal(&cwt->cwt_cv);
-                       mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
-               }
-               mtx_unlock(&cwt->cwt_lock);
-               mtx_destroy(&cwt->cwt_lock);
-               cv_destroy(&cwt->cwt_cv);
-       }
-       free(threads, M_CXGBE);
-}
-
-static void
-stop_worker_threads(void)
-{
-
-       MPASS(worker_thread_count >= 0);
-       stop_worker_threads1(cwt_rx_threads);
-       stop_worker_threads1(cwt_tx_threads);
-}
-
-/* Select a worker thread for a connection. */
-u_int
-cxgbei_select_worker_thread(struct icl_cxgbei_conn *icc)
-{
-       struct adapter *sc = icc->sc;
-       struct toepcb *toep = icc->toep;
-       u_int i, n;
-
-       n = worker_thread_count / sc->sge.nofldrxq;
-       if (n > 0)
-               i = toep->vi->pi->port_id * n + arc4random() % n;
-       else
-               i = arc4random() % worker_thread_count;
-
-       CTR3(KTR_CXGBE, "%s: tid %u, cwt %u", __func__, toep->tid, i);
-
-       return (i);
-}
-
 static int
 cxgbei_mod_load(void)
 {
@@ -1154,15 +933,9 @@ cxgbei_mod_load(void)
        t4_register_cpl_handler(CPL_RX_ISCSI_DDP, do_rx_iscsi_ddp);
        t4_register_cpl_handler(CPL_RX_ISCSI_CMP, do_rx_iscsi_cmp);
 
-       rc = start_worker_threads();
-       if (rc != 0)
-               return (rc);
-
        rc = t4_register_uld(&cxgbei_uld_info);
-       if (rc != 0) {
-               stop_worker_threads();
+       if (rc != 0)
                return (rc);
-       }
 
        t4_iterate(cxgbei_activate_all, NULL);
 
@@ -1178,8 +951,6 @@ cxgbei_mod_unload(void)
        if (t4_unregister_uld(&cxgbei_uld_info) == EBUSY)
                return (EBUSY);
 
-       stop_worker_threads();
-
        t4_register_cpl_handler(CPL_ISCSI_HDR, NULL);
        t4_register_cpl_handler(CPL_ISCSI_DATA, NULL);
        t4_register_cpl_handler(CPL_RX_ISCSI_DDP, NULL);
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index b078f3110d62..d0e423ce5b2f 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -32,21 +32,6 @@
 
 #include <dev/iscsi/icl.h>
 
-enum {
-       CWT_SLEEPING    = 1,
-       CWT_RUNNING     = 2,
-       CWT_STOP        = 3,
-};
-
-struct cxgbei_worker_thread {
-       struct mtx      cwt_lock;
-       struct cv       cwt_cv;
-       volatile int    cwt_state;
-       struct thread   *cwt_td;
-
-       TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
-} __aligned(CACHE_LINE_SIZE);
-
 #define CXGBEI_CONN_SIGNATURE 0x56788765
 
 struct cxgbei_cmp {
@@ -67,12 +52,12 @@ struct icl_cxgbei_conn {
        int ulp_submode;
        struct adapter *sc;
        struct toepcb *toep;
-       u_int cwt;
 
        /* Receive related. */
        bool rx_active;                         /* protected by so_rcv lock */
+       bool rx_exiting;                        /* protected by so_rcv lock */
        STAILQ_HEAD(, icl_pdu) rcvd_pdus;       /* protected by so_rcv lock */
-       TAILQ_ENTRY(icl_cxgbei_conn) rx_link;   /* protected by cwt lock */
+       struct thread *rx_thread;
 
        struct cxgbei_cmp_head *cmp_table;      /* protected by cmp_lock */
        struct mtx cmp_lock;
@@ -81,7 +66,7 @@ struct icl_cxgbei_conn {
        /* Transmit related. */
        bool tx_active;                         /* protected by ic lock */
        STAILQ_HEAD(, icl_pdu) sent_pdus;       /* protected by ic lock */
-       TAILQ_ENTRY(icl_cxgbei_conn) tx_link;   /* protected by cwt lock */
+       struct thread *tx_thread;
 };
 
 static inline struct icl_cxgbei_conn *
@@ -136,6 +121,7 @@ struct cxgbei_data {
 /* cxgbei.c */
 u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
 void cwt_queue_for_tx(struct icl_cxgbei_conn *);
+void parse_pdus(struct icl_cxgbei_conn *, struct sockbuf *);
 
 /* icl_cxgbei.c */
 void cwt_tx_main(void *);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c 
b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index 516ab931a49c..296d4f2d270a 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -422,124 +422,133 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct 
icl_cxgbei_pdu *icp)
 }
 
 static void
-cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+icl_cxgbei_tx_main(void *arg)
 {
        struct epoch_tracker et;
+       struct icl_cxgbei_conn *icc = arg;
        struct icl_conn *ic = &icc->ic;
        struct toepcb *toep = icc->toep;
-       struct inpcb *inp;
-
-       /*
-        * Do not get inp from toep->inp as the toepcb might have
-        * detached already.
-        */
-       inp = sotoinpcb(so);
-       CURVNET_SET(toep->vnet);
-       NET_EPOCH_ENTER(et);
-       INP_WLOCK(inp);
-
-       ICL_CONN_UNLOCK(ic);
-       if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
-           __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
-               mbufq_drain(mq);
-       } else {
-               mbufq_concat(&toep->ulp_pduq, mq);
-               t4_push_pdus(icc->sc, toep, 0);
-       }
-       INP_WUNLOCK(inp);
-       NET_EPOCH_EXIT(et);
-       CURVNET_RESTORE();
-
-       ICL_CONN_LOCK(ic);
-}
-
-void
-cwt_tx_main(void *arg)
-{
-       struct cxgbei_worker_thread *cwt = arg;
-       struct icl_cxgbei_conn *icc;
-       struct icl_conn *ic;
+       struct socket *so = ic->ic_socket;
+       struct inpcb *inp = sotoinpcb(so);
        struct icl_pdu *ip;
-       struct socket *so;
        struct mbuf *m;
        struct mbufq mq;
        STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
 
-       MPASS(cwt != NULL);
+       mbufq_init(&mq, INT_MAX);
 
-       mtx_lock(&cwt->cwt_lock);
-       MPASS(cwt->cwt_state == 0);
-       cwt->cwt_state = CWT_RUNNING;
-       cv_signal(&cwt->cwt_cv);
+       ICL_CONN_LOCK(ic);
+       while (__predict_true(!ic->ic_disconnecting)) {
+               while (STAILQ_EMPTY(&icc->sent_pdus)) {
+                       icc->tx_active = false;
+                       mtx_sleep(&icc->tx_active, ic->ic_lock, 0, "-", 0);
+                       if (__predict_false(ic->ic_disconnecting))
+                               goto out;
+                       MPASS(icc->tx_active);
+               }
 
-       mbufq_init(&mq, INT_MAX);
-       while (__predict_true(cwt->cwt_state != CWT_STOP)) {
-               cwt->cwt_state = CWT_RUNNING;
-               while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
-                       TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
-                       mtx_unlock(&cwt->cwt_lock);
+               STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+               ICL_CONN_UNLOCK(ic);
 
-                       ic = &icc->ic;
+               while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+                       STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
 
-                       ICL_CONN_LOCK(ic);
-                       MPASS(icc->tx_active);
-                       STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
-                       ICL_CONN_UNLOCK(ic);
+                       m = finalize_pdu(icc, ip_to_icp(ip));
+                       M_ASSERTPKTHDR(m);
+                       MPASS((m->m_pkthdr.len & 3) == 0);
 
-                       while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
-                               STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+                       mbufq_enqueue(&mq, m);
+               }
 
-                               m = finalize_pdu(icc, ip_to_icp(ip));
-                               M_ASSERTPKTHDR(m);
-                               MPASS((m->m_pkthdr.len & 3) == 0);
+               ICL_CONN_LOCK(ic);
+               if (__predict_false(ic->ic_disconnecting) ||
+                   __predict_false(ic->ic_socket == NULL)) {
+                       mbufq_drain(&mq);
+                       break;
+               }
 
-                               mbufq_enqueue(&mq, m);
-                       }
+               CURVNET_SET(toep->vnet);
+               NET_EPOCH_ENTER(et);
+               INP_WLOCK(inp);
 
-                       ICL_CONN_LOCK(ic);
-                       so = ic->ic_socket;
-                       if (__predict_false(ic->ic_disconnecting) ||
-                           __predict_false(so == NULL)) {
-                               mbufq_drain(&mq);
-                               icc->tx_active = false;
-                               ICL_CONN_UNLOCK(ic);
+               ICL_CONN_UNLOCK(ic);
+               if (__predict_false(inp->inp_flags & (INP_DROPPED |
+                   INP_TIMEWAIT)) ||
+                   __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+                       mbufq_drain(&mq);
+               } else {
+                       mbufq_concat(&toep->ulp_pduq, &mq);
+                       t4_push_pdus(icc->sc, toep, 0);
+               }
+               INP_WUNLOCK(inp);
+               NET_EPOCH_EXIT(et);
+               CURVNET_RESTORE();
 
-                               mtx_lock(&cwt->cwt_lock);
-                               continue;
-                       }
+               ICL_CONN_LOCK(ic);
+       }
+out:
+       ICL_CONN_UNLOCK(ic);
 
-                       cwt_push_pdus(icc, so, &mq);
+       kthread_exit();
+}
 
-                       MPASS(icc->tx_active);
-                       if (STAILQ_EMPTY(&icc->sent_pdus)) {
-                               icc->tx_active = false;
-                               ICL_CONN_UNLOCK(ic);
-
-                               mtx_lock(&cwt->cwt_lock);
-                       } else {
-                               ICL_CONN_UNLOCK(ic);
-
-                               /*
-                                * More PDUs were queued while we were
-                                * busy sending the previous batch.
-                                * Re-add this connection to the end
-                                * of the queue.
-                                */
-                               mtx_lock(&cwt->cwt_lock);
-                               TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
-                                   tx_link);
-                       }
+static void
+icl_cxgbei_rx_main(void *arg)
+{
+       struct icl_cxgbei_conn *icc = arg;
+       struct icl_conn *ic = &icc->ic;
+       struct icl_pdu *ip;
+       struct sockbuf *sb;
+       STAILQ_HEAD(, icl_pdu) rx_pdus = STAILQ_HEAD_INITIALIZER(rx_pdus);
+       bool cantrcvmore;
+
+       sb = &ic->ic_socket->so_rcv;
+       SOCKBUF_LOCK(sb);
+       while (__predict_true(!ic->ic_disconnecting)) {
+               while (STAILQ_EMPTY(&icc->rcvd_pdus)) {
+                       icc->rx_active = false;
+                       mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+                       if (__predict_false(ic->ic_disconnecting))
+                               goto out;
+                       MPASS(icc->rx_active);
                }
 
-               /* Inner loop doesn't check for CWT_STOP, do that first. */
-               if (__predict_false(cwt->cwt_state == CWT_STOP))
-                       break;
-               cwt->cwt_state = CWT_SLEEPING;
-               cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+               if (__predict_false(sbused(sb)) != 0) {
+                       /*
+                        * PDUs were received before the tid
+                        * transitioned to ULP mode.  Convert
+                        * them to icl_cxgbei_pdus and insert
+                        * them into the head of rcvd_pdus.
+                        */
+                       parse_pdus(icc, sb);
+               }
+               cantrcvmore = (sb->sb_state & SBS_CANTRCVMORE) != 0;
+               MPASS(STAILQ_EMPTY(&rx_pdus));
+               STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
+               SOCKBUF_UNLOCK(sb);
+
+               /* Hand over PDUs to ICL. */
+               while ((ip = STAILQ_FIRST(&rx_pdus)) != NULL) {
+                       STAILQ_REMOVE_HEAD(&rx_pdus, ip_next);
+                       if (cantrcvmore)
+                               icl_cxgbei_pdu_done(ip, ENOTCONN);
+                       else
+                               ic->ic_receive(ip);
+               }
+
+               SOCKBUF_LOCK(sb);
        }
+out:
+       /*
+        * Since ic_disconnecting is set before the SOCKBUF_MTX is
+        * locked in icl_cxgbei_conn_close, the loop above can exit
+        * before icl_cxgbei_conn_close can lock SOCKBUF_MTX and block
+        * waiting for the thread exit.
+        */
+       while (!icc->rx_exiting)
+               mtx_sleep(&icc->rx_active, SOCKBUF_MTX(sb), 0, "-", 0);
+       SOCKBUF_UNLOCK(sb);
 
-       MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
-       mtx_unlock(&cwt->cwt_lock);
        kthread_exit();
 }
 
@@ -678,7 +687,7 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct 
icl_pdu *ip,
        STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
        if (!icc->tx_active) {
                icc->tx_active = true;
-               cwt_queue_for_tx(icc);
+               wakeup(&icc->tx_active);
        }
 }
 
@@ -740,10 +749,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket 
*so, int sspace,
        rs = max(recvspace, rspace);
 
        error = soreserve(so, ss, rs);
-       if (error != 0) {
-               icl_cxgbei_conn_close(ic);
+       if (error != 0)
                return (error);
-       }
        SOCKBUF_LOCK(&so->so_snd);
        so->so_snd.sb_flags |= SB_AUTOSIZE;
        SOCKBUF_UNLOCK(&so->so_snd);
@@ -761,10 +768,8 @@ icl_cxgbei_setsockopt(struct icl_conn *ic, struct socket 
*so, int sspace,
        opt.sopt_val = &one;
        opt.sopt_valsize = sizeof(one);
        error = sosetopt(so, &opt);
-       if (error != 0) {
-               icl_cxgbei_conn_close(ic);
+       if (error != 0)
                return (error);
-       }
 
        return (0);
 }
@@ -934,8 +939,10 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
        fa.sc = NULL;
        fa.so = so;
        t4_iterate(find_offload_adapter, &fa);
-       if (fa.sc == NULL)
-               return (EINVAL);
+       if (fa.sc == NULL) {
+               error = EINVAL;
+               goto out;
+       }
        icc->sc = fa.sc;
 
        max_rx_pdu_len = ISCSI_BHS_SIZE + ic->ic_max_recv_data_segment_length;
@@ -954,7 +961,8 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
        tp = intotcpcb(inp);
        if (inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) {
                INP_WUNLOCK(inp);
-               return (EBUSY);
+               error = ENOTCONN;
+               goto out;
        }
 
        /*
@@ -968,11 +976,11 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
 
        if (ulp_mode(toep) != ULP_MODE_NONE) {
                INP_WUNLOCK(inp);
-               return (EINVAL);
+               error = EINVAL;
+               goto out;
        }
 
        icc->toep = toep;
-       icc->cwt = cxgbei_select_worker_thread(icc);
 
        icc->ulp_submode = 0;
        if (ic->ic_header_crc32c)
@@ -996,7 +1004,21 @@ icl_cxgbei_conn_handoff(struct icl_conn *ic, int fd)
        set_ulp_mode_iscsi(icc->sc, toep, icc->ulp_submode);
        INP_WUNLOCK(inp);
 
-       return (icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len));
+       error = kthread_add(icl_cxgbei_tx_main, icc, NULL, &icc->tx_thread, 0,
+           0, "%stx (cxgbei)", ic->ic_name);
+       if (error != 0)
+               goto out;
+
+       error = kthread_add(icl_cxgbei_rx_main, icc, NULL, &icc->rx_thread, 0,
+           0, "%srx (cxgbei)", ic->ic_name);
+       if (error != 0)
+               goto out;
+
+       error = icl_cxgbei_setsockopt(ic, so, max_tx_pdu_len, max_rx_pdu_len);
+out:
+       if (error != 0)
+               icl_cxgbei_conn_close(ic);
+       return (error);
 }
 
 void
@@ -1027,60 +1049,58 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
            ("destroying session with %d outstanding PDUs",
             ic->ic_outstanding_pdus));
 #endif
-       ICL_CONN_UNLOCK(ic);
 
        CTR3(KTR_CXGBE, "%s: tid %d, icc %p", __func__, toep ? toep->tid : -1,
            icc);
+
+       /*
+        * Wait for the transmit thread to stop processing
+        * this connection.
+        */
+       if (icc->tx_thread != NULL) {
+               wakeup(&icc->tx_active);
+               mtx_sleep(icc->tx_thread, ic->ic_lock, 0, "conclo", 0);
+       }
+
+       /* Discard PDUs queued for TX. */
+       while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+               ip = STAILQ_FIRST(&icc->sent_pdus);
+               STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+               icl_cxgbei_pdu_done(ip, ENOTCONN);
+       }
+       ICL_CONN_UNLOCK(ic);
+
        inp = sotoinpcb(so);
        sb = &so->so_rcv;
+
+       /*
+        * Wait for the receive thread to stop processing this
+        * connection.
+        */
+       SOCKBUF_LOCK(sb);
+       if (icc->rx_thread != NULL) {
+               icc->rx_exiting = true;
+               wakeup(&icc->rx_active);
+               mtx_sleep(icc->rx_thread, SOCKBUF_MTX(sb), 0, "conclo", 0);
+       }
+
+       /*
+        * Discard received PDUs not passed to the iSCSI layer.
+        */
+       while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
+               ip = STAILQ_FIRST(&icc->rcvd_pdus);
+               STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
+               icl_cxgbei_pdu_done(ip, ENOTCONN);
+       }
+       SOCKBUF_UNLOCK(sb);
+
        INP_WLOCK(inp);
        if (toep != NULL) {     /* NULL if connection was never offloaded. */
                toep->ulpcb = NULL;
 
-               /*
-                * Wait for the cwt threads to stop processing this
-                * connection for transmit.
-                */
-               while (icc->tx_active)
-                       rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
-
-               /* Discard PDUs queued for TX. */
-               while (!STAILQ_EMPTY(&icc->sent_pdus)) {
-                       ip = STAILQ_FIRST(&icc->sent_pdus);
-                       STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
-                       icl_cxgbei_pdu_done(ip, ENOTCONN);
-               }
+               /* Discard mbufs queued for TX. */
                mbufq_drain(&toep->ulp_pduq);
 
-               /*
-                * Wait for the cwt threads to stop processing this
-                * connection for receive.
-                */
-               SOCKBUF_LOCK(sb);
-               if (icc->rx_active) {
-                       volatile bool *p = &icc->rx_active;
-
-                       SOCKBUF_UNLOCK(sb);
-                       INP_WUNLOCK(inp);
-
-                       while (*p)
-                               pause("conclo", 1);
-
-                       INP_WLOCK(inp);
-                       SOCKBUF_LOCK(sb);
-               }
-
-               /*
-                * Discard received PDUs not passed to the iSCSI
-                * layer.
-                */
-               while (!STAILQ_EMPTY(&icc->rcvd_pdus)) {
-                       ip = STAILQ_FIRST(&icc->rcvd_pdus);
-                       STAILQ_REMOVE_HEAD(&icc->rcvd_pdus, ip_next);
-                       icl_cxgbei_pdu_done(ip, ENOTCONN);
-               }
-               SOCKBUF_UNLOCK(sb);
-
                /*
                 * Grab a reference to use when waiting for the final
                 * CPL to be received.  If toep->inp is NULL, then

Reply via email to