The branch main has been updated by glebius:

URL: 
https://cgit.FreeBSD.org/src/commit/?id=d80a97def9a1db6f07f5d2e68f7ad62b27918947

commit d80a97def9a1db6f07f5d2e68f7ad62b27918947
Author:     Gleb Smirnoff <gleb...@freebsd.org>
AuthorDate: 2024-04-08 20:16:51 +0000
Commit:     Gleb Smirnoff <gleb...@freebsd.org>
CommitDate: 2024-04-08 20:16:51 +0000

    unix: new implementation of unix/stream & unix/seqpacket
    
    Provide protocol specific pr_sosend and pr_soreceive for PF_UNIX
    SOCK_STREAM sockets and implement SOCK_SEQPACKET sockets as an extension
    of SOCK_STREAM.  The change meets three goals: get rid of unix(4) specific
    stuff in the generic socket code, provide a faster and robust unix/stream
    sockets and bring unix/seqpacket much closer to specification.  Highlights
    follow:
    
    - The send buffer now is truly bypassed.  Previously it was always empty,
    but the send(2) still needed to acquire its lock and do a variety of
    tricks to be woken up in the right time while sleeping on it.  Now the
    only two things we care about in the send buffer is the I/O sx(9) lock
    that serializes operations and value of so_snd.sb_hiwat, which we can read
    without obtaining a lock.  The sleep of a send(2) happens on the mutex of
    the receive buffer of the peer.  A bulk send/recv of data with large
    socket buffers will make both syscalls just bounce between owning the
    receive buffer lock and copyin(9)/copyout(9), no other locks would be
    involved.
    
    - The implementation uses new mchain structure to manipulate mbuf chains.
    Note that this required converting to mchain two functions that are shared
    with unix/dgram: unp_internalize() and unp_addsockcred() as well as adding
    a new shared one uipc_process_kernel_mbuf().  This induces some non-
    functional changes in the unix/dgram code as well.  There is a space for
    improvement here, as right now it is a mix of mchain and manually managed
    mbuf chains.
    
    - unix/seqpacket previously marked as PR_ADDR & PR_ATOMIC and thus treated
    as a datagram socket by the generic socket code, now becomes a true stream
    socket with record markers.
    
    - unix/stream loses the sendfile(2) support.  This can be brought back,
    but requires some work.  Let's first see if there is any interest in this
    feature, except purely academical.
    
    Reviewed by:            markj, tuexen
    Differential Revision:  https://reviews.freebsd.org/D44151
---
 sys/kern/uipc_usrreq.c | 956 +++++++++++++++++++++++++++++++++----------------
 sys/sys/sockbuf.h      |   7 +
 2 files changed, 645 insertions(+), 318 deletions(-)

diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c
index 6e83e2be6f05..1b9416269696 100644
--- a/sys/kern/uipc_usrreq.c
+++ b/sys/kern/uipc_usrreq.c
@@ -5,7 +5,7 @@
  *     The Regents of the University of California. All Rights Reserved.
  * Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved.
  * Copyright (c) 2018 Matthew Macy
- * Copyright (c) 2022 Gleb Smirnoff <gleb...@freebsd.org>
+ * Copyright (c) 2022-2024 Gleb Smirnoff <gleb...@freebsd.org>
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -141,11 +141,14 @@ static struct timeout_task unp_gc_task;
 static struct task     unp_defer_task;
 
 /*
- * Both send and receive buffers are allocated PIPSIZ bytes of buffering for
- * stream sockets, although the total for sender and receiver is actually
- * only PIPSIZ.
+ * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer,
+ * however the notion of send buffer still makes sense with them.  Its size is
+ * the amount of space that a send(2) syscall may copyin(9) before checking
+ * with the receive buffer of a peer.  Although not linked anywhere yet,
+ * pointed to by a stack variable, effectively it is a buffer that needs to be
+ * sized.
  *
- * Datagram sockets really use the sendspace as the maximum datagram size,
+ * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size,
  * and don't really want to reserve the sendspace.  Their recvspace should be
  * large enough for at least one max-size datagram plus address.
  */
@@ -156,7 +159,7 @@ static u_long       unpst_sendspace = PIPSIZ;
 static u_long  unpst_recvspace = PIPSIZ;
 static u_long  unpdg_maxdgram = 8*1024;        /* support 8KB syslog msgs */
 static u_long  unpdg_recvspace = 16*1024;
-static u_long  unpsp_sendspace = PIPSIZ;       /* really max datagram size */
+static u_long  unpsp_sendspace = PIPSIZ;
 static u_long  unpsp_recvspace = PIPSIZ;
 
 static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
@@ -300,13 +303,12 @@ static void       unp_gc(__unused void *, int);
 static void    unp_scan(struct mbuf *, void (*)(struct filedescent **, int));
 static void    unp_discard(struct file *);
 static void    unp_freerights(struct filedescent **, int);
-static int     unp_internalize(struct mbuf **, struct thread *,
-                   struct mbuf **, u_int *, u_int *);
+static int     unp_internalize(struct mbuf *, struct mchain *,
+                   struct thread *);
 static void    unp_internalize_fp(struct file *);
 static int     unp_externalize(struct mbuf *, struct mbuf **, int);
 static int     unp_externalize_fp(struct file *);
-static struct mbuf     *unp_addsockcred(struct thread *, struct mbuf *,
-                   int, struct mbuf **, u_int *, u_int *);
+static void    unp_addsockcred(struct thread *, struct mchain *, int);
 static void    unp_process_defers(void * __unused, int);
 
 static void
@@ -449,6 +451,7 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
                case SOCK_STREAM:
                        sendspace = unpst_sendspace;
                        recvspace = unpst_recvspace;
+                       STAILQ_INIT(&so->so_rcv.sb_mbq);
                        break;
 
                case SOCK_DGRAM:
@@ -466,6 +469,7 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
                case SOCK_SEQPACKET:
                        sendspace = unpsp_sendspace;
                        recvspace = unpsp_recvspace;
+                       STAILQ_INIT(&so->so_rcv.sb_mbq);
                        break;
 
                default:
@@ -797,6 +801,10 @@ uipc_detach(struct socket *so)
                taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1);
 
        switch (so->so_type) {
+       case SOCK_STREAM:
+       case SOCK_SEQPACKET:
+               MPASS(STAILQ_EMPTY(&so->so_rcv.sb_mbq));
+               break;
        case SOCK_DGRAM:
                /*
                 * Everything should have been unlinked/freed by unp_dispose()
@@ -852,6 +860,10 @@ uipc_listen(struct socket *so, int backlog, struct thread 
*td)
        error = solisten_proto_check(so);
        if (error == 0) {
                cru2xt(td, &unp->unp_peercred);
+               (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_snd.sb_hiwat,
+                   0, RLIM_INFINITY);
+               (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_rcv.sb_hiwat,
+                   0, RLIM_INFINITY);
                solisten_proto(so, backlog);
        }
        SOCK_UNLOCK(so);
@@ -885,187 +897,563 @@ uipc_peeraddr(struct socket *so, struct sockaddr *ret)
        return (0);
 }
 
-static int
-uipc_rcvd(struct socket *so, int flags)
+/*
+ * pr_sosend() called with mbuf instead of uio is a kernel thread.  NFS,
+ * netgraph(4) and other subsystems can call into socket code.  The
+ * function will condition the mbuf so that it can be safely put onto socket
+ * buffer and calculate its char count and mbuf count.
+ *
+ * Note: we don't support receiving control data from a kernel thread.  Our
+ * pr_sosend methods have MPASS() to check that.  This may change.
+ */
+static void
+uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc)
 {
-       struct unpcb *unp, *unp2;
-       struct socket *so2;
-       u_int mbcnt, sbcc;
 
-       unp = sotounpcb(so);
-       KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
-       KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
-           ("%s: socktype %d", __func__, so->so_type));
+       M_ASSERTPKTHDR(m);
 
-       /*
-        * Adjust backpressure on sender and wakeup any waiting to write.
-        *
-        * The unp lock is acquired to maintain the validity of the unp_conn
-        * pointer; no lock on unp2 is required as unp2->unp_socket will be
-        * static as long as we don't permit unp2 to disconnect from unp,
-        * which is prevented by the lock on unp.  We cache values from
-        * so_rcv to avoid holding the so_rcv lock over the entire
-        * transaction on the remote so_snd.
-        */
-       SOCKBUF_LOCK(&so->so_rcv);
-       mbcnt = so->so_rcv.sb_mbcnt;
-       sbcc = sbavail(&so->so_rcv);
-       SOCKBUF_UNLOCK(&so->so_rcv);
-       /*
-        * There is a benign race condition at this point.  If we're planning to
-        * clear SB_STOP, but uipc_send is called on the connected socket at
-        * this instant, it might add data to the sockbuf and set SB_STOP.  Then
-        * we would erroneously clear SB_STOP below, even though the sockbuf is
-        * full.  The race is benign because the only ill effect is to allow the
-        * sockbuf to exceed its size limit, and the size limits are not
-        * strictly guaranteed anyway.
-        */
-       UNP_PCB_LOCK(unp);
-       unp2 = unp->unp_conn;
-       if (unp2 == NULL) {
-               UNP_PCB_UNLOCK(unp);
-               return (0);
+       m_clrprotoflags(m);
+       m_tag_delete_chain(m, NULL);
+       m->m_pkthdr.rcvif = NULL;
+       m->m_pkthdr.flowid = 0;
+       m->m_pkthdr.csum_flags = 0;
+       m->m_pkthdr.fibnum = 0;
+       m->m_pkthdr.rsstype = 0;
+
+       mc_init_m(mc, m);
+       MPASS(m->m_pkthdr.len == mc->mc_len);
+}
+
+#ifdef SOCKBUF_DEBUG
+static inline void
+uipc_stream_sbcheck(struct sockbuf *sb)
+{
+       struct mbuf *d;
+       u_int dcc, dctl, dmbcnt;
+
+       dcc = dctl = dmbcnt = 0;
+       STAILQ_FOREACH(d, &sb->sb_mbq, m_stailq) {
+               if (d->m_type == MT_CONTROL)
+                       dctl += d->m_len;
+               else if (d->m_type == MT_DATA)
+                       dcc +=  d->m_len;
+               else
+                       MPASS(0);
+               dmbcnt += MSIZE;
+               if (d->m_flags & M_EXT)
+                       dmbcnt += d->m_ext.ext_size;
+               if (d->m_stailq.stqe_next == NULL)
+                       MPASS(sb->sb_mbq.stqh_last == &d->m_stailq.stqe_next);
        }
-       so2 = unp2->unp_socket;
-       SOCKBUF_LOCK(&so2->so_snd);
-       if (sbcc < so2->so_snd.sb_hiwat && mbcnt < so2->so_snd.sb_mbmax)
-               so2->so_snd.sb_flags &= ~SB_STOP;
-       sowwakeup_locked(so2);
-       UNP_PCB_UNLOCK(unp);
-       return (0);
+       MPASS(dcc == sb->sb_acc);
+       MPASS(dcc == sb->sb_ccc);
+       MPASS(dctl == sb->sb_ctl);
+       MPASS(dmbcnt == sb->sb_mbcnt);
+}
+#define        UIPC_STREAM_SBCHECK(sb) uipc_stream_sbcheck(sb)
+#else
+#define        UIPC_STREAM_SBCHECK(sb) do {} while (0)
+#endif
+
+/*
+ * uipc_stream_sbspace() returns how much a writer can send, limited by char
+ * count or mbuf memory use, whatever ends first.
+ *
+ * XXXGL: sb_mbcnt may overcommit sb_mbmax in case if previous write observed
+ * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended
+ * up with 'mc_mlen > mbspace'.  A typical scenario would be a full buffer with
+ * writer trying to push in a large write, and a slow reader, that reads just
+ * a few bytes at a time.  In that case writer will keep creating new mbufs
+ * with mc_split().  These mbufs will carry little chars, but will all point at
+ * the same cluster, thus each adding cluster size to sb_mbcnt.  This means we
+ * will count same cluster many times potentially underutilizing socket buffer.
+ * We aren't optimizing towards ineffective readers.  Classic socket buffer had
+ * the same "feature".
+ */
+static inline u_int
+uipc_stream_sbspace(struct sockbuf *sb)
+{
+       u_int space, mbspace;
+
+       MPASS(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl);
+       space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl;
+       if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt))
+               mbspace = sb->sb_mbmax - sb->sb_mbcnt;
+       else
+               return (0);
+
+       return (min(space, mbspace));
 }
 
 static int
-uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam,
-    struct mbuf *control, struct thread *td)
+uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr,
+    struct uio *uio, struct mbuf *m, struct mbuf *c, int flags,
+    struct thread *td)
 {
        struct unpcb *unp, *unp2;
        struct socket *so2;
-       u_int mbcnt, sbcc;
+       struct sockbuf *sb;
+       struct mchain mc, cmc;
+       ssize_t resid, sent;
+       bool nonblock, eor;
        int error;
 
-       unp = sotounpcb(so);
-       KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
-       KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
-           ("%s: socktype %d", __func__, so->so_type));
+       MPASS((uio != NULL && m == NULL) || (m != NULL && uio == NULL));
+       MPASS(m == NULL || c == NULL);
 
-       error = 0;
-       if (flags & PRUS_OOB) {
+       if (__predict_false(flags & MSG_OOB)) {
                error = EOPNOTSUPP;
-               goto release;
+               goto out;
        }
-       if (control != NULL &&
-           (error = unp_internalize(&control, td, NULL, NULL, NULL)))
-               goto release;
 
-       unp2 = NULL;
-       if ((so->so_state & SS_ISCONNECTED) == 0) {
-               if (nam != NULL) {
-                       if ((error = unp_connect(so, nam, td)) != 0)
-                               goto out;
-               } else {
-                       error = ENOTCONN;
+       nonblock = (so->so_state & SS_NBIO) ||
+           (flags & (MSG_DONTWAIT | MSG_NBIO));
+       eor = flags & MSG_EOR;
+
+       mc = MCHAIN_INITIALIZER(&mc);
+       cmc = MCHAIN_INITIALIZER(&cmc);
+       sent = 0;
+
+       if (m == NULL) {
+               if (c != NULL && (error = unp_internalize(c, &cmc, td)))
                        goto out;
-               }
-       }
+               /*
+                * Optimization for a case when our send fits into the receive
+                * buffer - do the copyin before taking any locks, sized to our
+                * send buffer.  Later copyins will also take into account
+                * space in the peer's receive buffer.
+                */
+               resid = uio->uio_resid;
+               error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK,
+                   eor ? M_EOR : 0);
+               if (__predict_false(error))
+                       goto out2;
+       } else
+               uipc_reset_kernel_mbuf(m, &mc);
+
+       error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
+       if (error)
+               goto out2;
 
+       unp = sotounpcb(so);
        UNP_PCB_LOCK(unp);
-       if ((unp2 = unp_pcb_lock_peer(unp)) == NULL) {
+       unp2 = unp_pcb_lock_peer(unp);
+       if (__predict_false(so->so_error != 0)) {
+               error = so->so_error;
+               so->so_error = 0;
                UNP_PCB_UNLOCK(unp);
-               error = ENOTCONN;
-               goto out;
-       } else if (so->so_snd.sb_state & SBS_CANTSENDMORE) {
-               unp_pcb_unlock_pair(unp, unp2);
-               error = EPIPE;
-               goto out;
+               if (unp2 != NULL)
+                       UNP_PCB_UNLOCK(unp2);
+               goto out3;
        }
-       UNP_PCB_UNLOCK(unp);
-       if ((so2 = unp2->unp_socket) == NULL) {
-               UNP_PCB_UNLOCK(unp2);
-               error = ENOTCONN;
-               goto out;
+       if (__predict_false(unp2 == NULL)) {
+               /*
+                * Different error code for a previously connected socket and
+                * a never connected one.  The SS_ISDISCONNECTED is set in the
+                * unp_soisdisconnected() and is synchronized by the pcb lock.
+                */
+               error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN;
+               UNP_PCB_UNLOCK(unp);
+               goto out3;
        }
-       SOCKBUF_LOCK(&so2->so_rcv);
+       UNP_PCB_UNLOCK(unp);
+
        if (unp2->unp_flags & UNP_WANTCRED_MASK) {
                /*
                 * Credentials are passed only once on SOCK_STREAM and
                 * SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or
                 * forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS).
                 */
-               control = unp_addsockcred(td, control, unp2->unp_flags, NULL,
-                   NULL, NULL);
+               unp_addsockcred(td, &cmc, unp2->unp_flags);
                unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT;
        }
 
        /*
-        * Send to paired receive port and wake up readers.  Don't
-        * check for space available in the receive buffer if we're
-        * attaching ancillary data; Unix domain sockets only check
-        * for space in the sending sockbuf, and that check is
-        * performed one level up the stack.  At that level we cannot
-        * precisely account for the amount of buffer space used
-        * (e.g., because control messages are not yet internalized).
+        * Cycle through the data to send and available space in the peer's
+        * receive buffer.  Put a reference on the peer socket, so that it
+        * doesn't get freed while we sbwait().  If peer goes away, we will
+        * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's
+        * socket destruction.
         */
-       switch (so->so_type) {
-       case SOCK_STREAM:
-               if (control != NULL) {
-                       sbappendcontrol_locked(&so2->so_rcv,
-                           m->m_len > 0 ?  m : NULL, control, flags);
-                       control = NULL;
-               } else
-                       sbappend_locked(&so2->so_rcv, m, flags);
-               break;
+       so2 = unp2->unp_socket;
+       soref(so2);
+       UNP_PCB_UNLOCK(unp2);
+       sb = &so2->so_rcv;
+       while (mc.mc_len + cmc.mc_len > 0) {
+               struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext);
+               u_int space;
 
-       case SOCK_SEQPACKET:
-               if (sbappendaddr_nospacecheck_locked(&so2->so_rcv,
-                   &sun_noname, m, control))
-                       control = NULL;
-               break;
+               SOCK_RECVBUF_LOCK(so2);
+restart:
+               UIPC_STREAM_SBCHECK(sb);
+               if (__predict_false(cmc.mc_len > sb->sb_hiwat)) {
+                       SOCK_RECVBUF_UNLOCK(so2);
+                       error = EMSGSIZE;
+                       goto out4;
+               }
+               if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
+                       SOCK_RECVBUF_UNLOCK(so2);
+                       error = EPIPE;
+                       goto out4;
+               }
+               /*
+                * Wait on the peer socket receive buffer until we have enough
+                * space to put at least control.  The data is a stream and can
+                * be put partially, but control is really a datagram.
+                */
+               space = uipc_stream_sbspace(sb);
+               if (space < sb->sb_lowat || space < cmc.mc_len) {
+                       if (nonblock) {
+                               SOCK_RECVBUF_UNLOCK(so2);
+                               error = EWOULDBLOCK;
+                               goto out4;
+                       }
+                       if ((error = sbwait(so2, SO_RCV)) != 0) {
+                               SOCK_RECVBUF_UNLOCK(so2);
+                               goto out4;
+                       } else
+                               goto restart;
+               }
+               MPASS(space >= cmc.mc_len);
+               space -= cmc.mc_len;
+               if (space == 0) {
+                       /* There is space only to send control. */
+                       MPASS(!STAILQ_EMPTY(&cmc.mc_q));
+                       mcnext = mc;
+                       mc = MCHAIN_INITIALIZER(&mc);
+               } else if (space < mc.mc_len) {
+                       /* Not enough space. */
+                       if (__predict_false(mc_split(&mc, &mcnext, space,
+                           M_NOWAIT) == ENOMEM)) {
+                               /*
+                                * If allocation failed use M_WAITOK and merge
+                                * the chain back.  Next time mc_split() will
+                                * easily split at the same place.  Only if we
+                                * race with setsockopt(SO_RCVBUF) shrinking
+                                * sb_hiwat can this happen more than once.
+                                */
+                               SOCK_RECVBUF_UNLOCK(so2);
+                               (void)mc_split(&mc, &mcnext, space, M_WAITOK);
+                               mc_concat(&mc, &mcnext);
+                               SOCK_RECVBUF_LOCK(so2);
+                               goto restart;
+                       }
+                       MPASS(mc.mc_len == space);
+               }
+               if (!STAILQ_EMPTY(&cmc.mc_q)) {
+                       STAILQ_CONCAT(&sb->sb_mbq, &cmc.mc_q);
+                       sb->sb_ctl += cmc.mc_len;
+                       sb->sb_mbcnt += cmc.mc_mlen;
+                       cmc.mc_len = 0;
+               }
+               sent += mc.mc_len;
+               sb->sb_acc += mc.mc_len;
+               sb->sb_ccc += mc.mc_len;
+               sb->sb_mbcnt += mc.mc_mlen;
+               STAILQ_CONCAT(&sb->sb_mbq, &mc.mc_q);
+               UIPC_STREAM_SBCHECK(sb);
+               space = uipc_stream_sbspace(sb);
+               sorwakeup_locked(so2);
+               mc = mcnext;
+               if (STAILQ_EMPTY(&mc.mc_q) &&
+                   uio != NULL && uio->uio_resid > 0) {
+                       /*
+                        * Copyin sum of peer's receive buffer space and our
+                        * sb_hiwat, which is our virtual send buffer size.
+                        * See comment above unpst_sendspace declaration.
+                        * We are reading sb_hiwat locklessly, cause a) we
+                        * don't care about an application that does send(2)
+                        * and setsockopt(2) racing internally, and for an
+                        * application that does this in sequence we will see
+                        * the correct value cause sbsetopt() uses buffer lock
+                        * and we also have already acquired it at least once.
+                        */
+                       error = mc_uiotomc(&mc, uio, space +
+                           atomic_load_int(&so->so_snd.sb_hiwat), 0, M_WAITOK,
+                           eor ? M_EOR : 0);
+                       if (__predict_false(error))
+                               goto out4;
+               }
        }
 
-       mbcnt = so2->so_rcv.sb_mbcnt;
-       sbcc = sbavail(&so2->so_rcv);
-       if (sbcc)
-               sorwakeup_locked(so2);
-       else
-               SOCKBUF_UNLOCK(&so2->so_rcv);
+       MPASS(STAILQ_EMPTY(&mc.mc_q));
 
-       /*
-        * The PCB lock on unp2 protects the SB_STOP flag.  Without it,
-        * it would be possible for uipc_rcvd to be called at this
-        * point, drain the receiving sockbuf, clear SB_STOP, and then
-        * we would set SB_STOP below.  That could lead to an empty
-        * sockbuf having SB_STOP set
-        */
-       SOCKBUF_LOCK(&so->so_snd);
-       if (sbcc >= so->so_snd.sb_hiwat || mbcnt >= so->so_snd.sb_mbmax)
-               so->so_snd.sb_flags |= SB_STOP;
-       SOCKBUF_UNLOCK(&so->so_snd);
-       UNP_PCB_UNLOCK(unp2);
-       m = NULL;
+       td->td_ru.ru_msgsnd++;
+out4:
+       sorele(so2);
+out3:
+       SOCK_IO_SEND_UNLOCK(so);
+out2:
+       if (!mc_empty(&cmc))
+               unp_scan(mc_first(&cmc), unp_freerights);
 out:
+       mc_freem(&mc);
+       mc_freem(&cmc);
+
+       if (uio != NULL)
+               uio->uio_resid = resid - sent;
+
+       return (error);
+}
+
+static int
+uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa,
+    struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
+{
+       struct sockbuf *sb = &so->so_rcv;
+       struct mbuf *control, *m, *first, *last, *next;
+       u_int ctl, space, datalen, mbcnt, lastlen;
+       int error, flags;
+       bool nonblock, waitall, peek;
+
+       MPASS(mp0 == NULL);
+
+       if (psa != NULL)
+               *psa = NULL;
+       if (controlp != NULL)
+               *controlp = NULL;
+
+       flags = flagsp != NULL ? *flagsp : 0;
+       nonblock = (so->so_state & SS_NBIO) ||
+           (flags & (MSG_DONTWAIT | MSG_NBIO));
+       peek = flags & MSG_PEEK;
+       waitall = (flags & MSG_WAITALL) && !peek;
+
        /*
-        * PRUS_EOF is equivalent to pr_send followed by pr_shutdown.
+        * This check may fail only on a socket that never went through
+        * connect(2).  We can check this locklessly, cause: a) for a new born
+        * socket we don't care about applications that may race internally
+        * between connect(2) and recv(2), and b) for a dying socket if we
+        * miss update by unp_sosidisconnected(), we would still get the check
+        * correct.  For dying socket we would observe SBS_CANTRCVMORE later.
         */
-       if (flags & PRUS_EOF) {
-               UNP_PCB_LOCK(unp);
-               socantsendmore(so);
-               unp_shutdown(unp);
-               UNP_PCB_UNLOCK(unp);
+       if (__predict_false((atomic_load_short(&so->so_state) &
+           (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0))
+               return (ENOTCONN);
+
+       error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags));
+       if (__predict_false(error))
+               return (error);
+
+restart:
+       SOCK_RECVBUF_LOCK(so);
+       UIPC_STREAM_SBCHECK(sb);
+       while (sb->sb_acc < sb->sb_lowat &&
+           (sb->sb_ctl == 0 || controlp == NULL)) {
+               if (so->so_error) {
+                       error = so->so_error;
+                       if (!peek)
+                               so->so_error = 0;
+                       SOCK_RECVBUF_UNLOCK(so);
+                       SOCK_IO_RECV_UNLOCK(so);
+                       return (error);
+               }
+               if (sb->sb_state & SBS_CANTRCVMORE) {
+                       SOCK_RECVBUF_UNLOCK(so);
+                       SOCK_IO_RECV_UNLOCK(so);
+                       return (0);
+               }
+               if (nonblock) {
+                       SOCK_RECVBUF_UNLOCK(so);
+                       SOCK_IO_RECV_UNLOCK(so);
+                       return (EWOULDBLOCK);
+               }
+               error = sbwait(so, SO_RCV);
+               if (error) {
+                       SOCK_RECVBUF_UNLOCK(so);
+                       SOCK_IO_RECV_UNLOCK(so);
+                       return (error);
+               }
        }
-       if (control != NULL && error != 0)
-               unp_scan(control, unp_freerights);
 
-release:
-       if (control != NULL)
-               m_freem(control);
+       MPASS(STAILQ_FIRST(&sb->sb_mbq));
+       MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0);
+
+       mbcnt = 0;
+       ctl = 0;
+       first = STAILQ_FIRST(&sb->sb_mbq);
+       if (first->m_type == MT_CONTROL) {
+               control = first;
+               STAILQ_FOREACH_FROM(first, &sb->sb_mbq, m_stailq) {
+                       if (first->m_type != MT_CONTROL)
+                               break;
+                       ctl += first->m_len;
+                       mbcnt += MSIZE;
+                       if (first->m_flags & M_EXT)
+                               mbcnt += first->m_ext.ext_size;
+               }
+       } else
+               control = NULL;
+
        /*
-        * In case of PRUS_NOTREADY, uipc_ready() is responsible
-        * for freeing memory.
-        */   
-       if (m != NULL && (flags & PRUS_NOTREADY) == 0)
-               m_freem(m);
-       return (error);
+        * Find split point for the next copyout.  On exit from the loop:
+        * last == NULL - socket to be flushed
+        * last != NULL
+        *   lastlen > last->m_len - uio to be filled, last to be adjusted
+        *   lastlen == 0          - MT_CONTROL or M_EOR encountered
+        */
+       space = uio->uio_resid;
+       datalen = 0;
+       for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) {
+               if (m->m_type != MT_DATA) {
+                       last = m;
+                       lastlen = 0;
+                       break;
+               }
+               if (space >= m->m_len) {
+                       space -= m->m_len;
+                       datalen += m->m_len;
+                       mbcnt += MSIZE;
+                       if (m->m_flags & M_EXT)
+                               mbcnt += m->m_ext.ext_size;
+                       if (m->m_flags & M_EOR) {
+                               last = STAILQ_NEXT(m, m_stailq);
+                               lastlen = 0;
+                               flags |= MSG_EOR;
+                               break;
+                       }
+               } else {
+                       datalen += space;
+                       last = m;
+                       lastlen = space;
+                       break;
+               }
+       }
+
+       UIPC_STREAM_SBCHECK(sb);
+       if (!peek) {
+               if (last == NULL)
+                       STAILQ_INIT(&sb->sb_mbq);
+               else {
+                       STAILQ_FIRST(&sb->sb_mbq) = last;
+                       MPASS(last->m_len > lastlen);
+                       last->m_len -= lastlen;
+                       last->m_data += lastlen;
+               }
+               MPASS(sb->sb_acc >= datalen);
+               sb->sb_acc -= datalen;
+               sb->sb_ccc -= datalen;
+               MPASS(sb->sb_ctl >= ctl);
+               sb->sb_ctl -= ctl;
+               MPASS(sb->sb_mbcnt >= mbcnt);
+               sb->sb_mbcnt -= mbcnt;
+               UIPC_STREAM_SBCHECK(sb);
+               /* Mind the name.  We are waking writer here, not reader. */
+               sorwakeup_locked(so);
+       } else
+               SOCK_RECVBUF_UNLOCK(so);
+
+       while (control != NULL && control->m_type == MT_CONTROL) {
+               if (!peek) {
+                       struct mbuf *c;
+
+                       /*
+                        * unp_externalize() failure must abort entire read(2).
+                        * Such failure should also free the problematic
+                        * control, so that socket is not left in a state
+                        * where it can't progress forward with reading.
+                        * Probability of such a failure is really low, so it
+                        * is fine that we need to perform pretty complex
+                        * operation here to reconstruct the buffer.
+                        * XXXGL: unp_externalize() used to be
+                        * dom_externalize() KBI and it frees whole chain, so
+                        * we need to feed it with mbufs one by one.
+                        */
+                       c = control;
+                       control = STAILQ_NEXT(c, m_stailq);
+                       STAILQ_NEXT(c, m_stailq) = NULL;
+                       error = unp_externalize(c, controlp, flags);
+                       if (__predict_false(error)) {
+                               SOCK_RECVBUF_LOCK(so);
+                               UIPC_STREAM_SBCHECK(sb);
+                               MPASS(!(sb->sb_state & SBS_CANTRCVMORE));
+                               /* XXXGL: STAILQ_PREPEND */
+                               if (STAILQ_EMPTY(&sb->sb_mbq))
+                                       STAILQ_INSERT_HEAD(&sb->sb_mbq,
+                                           control, m_stailq);
+                               else
+                                       STAILQ_FIRST(&sb->sb_mbq) = control;
+                               sb->sb_ctl = sb->sb_acc = sb->sb_ccc =
+                                   sb->sb_mbcnt = 0;
+                               STAILQ_FOREACH(m, &sb->sb_mbq, m_stailq) {
+                                       if (m->m_type == MT_DATA) {
+                                               sb->sb_acc += m->m_len;
+                                               sb->sb_ccc += m->m_len;
+                                       } else {
+                                               sb->sb_ctl += m->m_len;
+                                       }
+                                       sb->sb_mbcnt += MSIZE;
+                                       if (m->m_flags & M_EXT)
+                                               sb->sb_mbcnt +=
+                                                   m->m_ext.ext_size;
+                               }
+                               UIPC_STREAM_SBCHECK(sb);
+                               SOCK_RECVBUF_UNLOCK(so);
+                               SOCK_IO_RECV_UNLOCK(so);
+                               return (error);
+                       }
+                       if (controlp != NULL) {
+                               while (*controlp != NULL)
+                                       controlp = &(*controlp)->m_next;
+                       }
+               } else {
+                       /*
+                        * XXXGL
+                        *
+                        * In MSG_PEEK case control is not externalized.  This
+                        * means we are leaking some kernel pointers to the
+                        * userland.  They are useless to a law-abiding
+                        * application, but may be useful to a malware.  This
+                        * is what the historical implementation in the
+                        * soreceive_generic() did. To be improved?
+                        */
+                       if (controlp != NULL) {
+                               *controlp = m_copym(control, 0, control->m_len,
+                                   M_WAITOK);
+                               controlp = &(*controlp)->m_next;
+                       }
+                       control = STAILQ_NEXT(control, m_stailq);
+               }
+       }
+
+       for (m = first; m != last; m = next) {
+               next = STAILQ_NEXT(m, m_stailq);
+               error = uiomove(mtod(m, char *), m->m_len, uio);
+               if (__predict_false(error)) {
+                       SOCK_IO_RECV_UNLOCK(so);
+                       if (!peek)
+                               for (; m != last; m = next) {
+                                       next = STAILQ_NEXT(m, m_stailq);
+                                       m_free(m);
+                               }
+                       return (error);
+               }
+               if (!peek)
+                       m_free(m);
+       }
+       if (last != NULL && lastlen > 0) {
+               if (!peek) {
+                       MPASS(!(m->m_flags & M_PKTHDR));
+                       MPASS(last->m_data - M_START(last) >= lastlen);
+                       error = uiomove(mtod(last, char *) - lastlen,
+                           lastlen, uio);
+               } else
+                       error = uiomove(mtod(last, char *), lastlen, uio);
+               if (__predict_false(error)) {
+                       SOCK_IO_RECV_UNLOCK(so);
+                       return (error);
+               }
+       }
+       if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0)
+               goto restart;
+       SOCK_IO_RECV_UNLOCK(so);
+
+       if (flagsp != NULL)
+               *flagsp |= flags;
+
+       uio->uio_td->td_ru.ru_msgrcv++;
+
+       return (0);
 }
 
 /* PF_UNIX/SOCK_DGRAM version of sbspace() */
@@ -1111,7 +1499,8 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
        const struct sockaddr *from;
        struct socket *so2;
        struct sockbuf *sb;
-       struct mbuf *f, *clast;
+       struct mchain cmc = MCHAIN_INITIALIZER(&cmc);
+       struct mbuf *f;
        u_int cc, ctl, mbcnt;
        u_int dcc __diagused, dctl __diagused, dmbcnt __diagused;
        int error;
@@ -1120,7 +1509,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
 
        error = 0;
        f = NULL;
-       ctl = 0;
 
        if (__predict_false(flags & MSG_OOB)) {
                error = EOPNOTSUPP;
@@ -1139,16 +1527,14 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
                f = m_gethdr(M_WAITOK, MT_SONAME);
                cc = m->m_pkthdr.len;
                mbcnt = MSIZE + m->m_pkthdr.memlen;
-               if (c != NULL &&
-                   (error = unp_internalize(&c, td, &clast, &ctl, &mbcnt)))
+               if (c != NULL && (error = unp_internalize(c, &cmc, td)))
                        goto out;
        } else {
-               /* pr_sosend() with mbuf usually is a kernel thread. */
-
-               M_ASSERTPKTHDR(m);
-               if (__predict_false(c != NULL))
-                       panic("%s: control from a kernel thread", __func__);
+               struct mchain mc;
 
+               uipc_reset_kernel_mbuf(m, &mc);
+               cc = mc.mc_len;
+               mbcnt = mc.mc_mlen;
                if (__predict_false(m->m_pkthdr.len > unpdg_maxdgram)) {
                        error = EMSGSIZE;
                        goto out;
@@ -1157,22 +1543,6 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
                        error = ENOBUFS;
                        goto out;
                }
-               /* Condition the foreign mbuf to our standards. */
-               m_clrprotoflags(m);
-               m_tag_delete_chain(m, NULL);
-               m->m_pkthdr.rcvif = NULL;
-               m->m_pkthdr.flowid = 0;
-               m->m_pkthdr.csum_flags = 0;
-               m->m_pkthdr.fibnum = 0;
-               m->m_pkthdr.rsstype = 0;
-
-               cc = m->m_pkthdr.len;
-               mbcnt = MSIZE;
-               for (struct mbuf *mb = m; mb != NULL; mb = mb->m_next) {
-                       mbcnt += MSIZE;
-                       if (mb->m_flags & M_EXT)
-                               mbcnt += mb->m_ext.ext_size;
-               }
        }
 
        unp = sotounpcb(so);
@@ -1224,8 +1594,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
        }
 
        if (unp2->unp_flags & UNP_WANTCRED_MASK)
-               c = unp_addsockcred(td, c, unp2->unp_flags, &clast, &ctl,
-                   &mbcnt);
+               unp_addsockcred(td, &cmc, unp2->unp_flags);
        if (unp->unp_addr != NULL)
                from = (struct sockaddr *)unp->unp_addr;
        else
@@ -1233,25 +1602,21 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
        f->m_len = from->sa_len;
        MPASS(from->sa_len <= MLEN);
        bcopy(from, mtod(f, void *), from->sa_len);
-       ctl += f->m_len;
 
        /*
         * Concatenate mbufs: from -> control -> data.
         * Save overall cc and mbcnt in "from" mbuf.
         */
-       if (c != NULL) {
-#ifdef INVARIANTS
-               struct mbuf *mc;
-
-               for (mc = c; mc->m_next != NULL; mc = mc->m_next);
-               MPASS(mc == clast);
-#endif
-               f->m_next = c;
-               clast->m_next = m;
-               c = NULL;
+       if (!STAILQ_EMPTY(&cmc.mc_q)) {
+               f->m_next = mc_first(&cmc);
+               mc_last(&cmc)->m_next = m;
+               /* XXXGL: This is dirty as well as rollback after ENOBUFS. */
+               STAILQ_INIT(&cmc.mc_q);
        } else
                f->m_next = m;
        m = NULL;
+       ctl = f->m_len + cmc.mc_len;
+       mbcnt += cmc.mc_mlen;
 #ifdef INVARIANTS
        dcc = dctl = dmbcnt = 0;
        for (struct mbuf *mb = f; mb != NULL; mb = mb->m_next) {
@@ -1317,7 +1682,7 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
                soroverflow_locked(so2);
                error = ENOBUFS;
                if (f->m_next->m_type == MT_CONTROL) {
-                       c = f->m_next;
+                       STAILQ_FIRST(&cmc.mc_q) = f->m_next;
                        f->m_next = NULL;
                }
        }
@@ -1332,13 +1697,12 @@ uipc_sosend_dgram(struct socket *so, struct sockaddr 
*addr, struct uio *uio,
 out3:
        SOCK_IO_SEND_UNLOCK(so);
 out2:
-       if (c)
-               unp_scan(c, unp_freerights);
+       if (!mc_empty(&cmc))
+               unp_scan(mc_first(&cmc), unp_freerights);
 out:
        if (f)
                m_freem(f);
-       if (c)
-               m_freem(c);
+       mc_freem(&cmc);
        if (m)
                m_freem(m);
 
@@ -1579,6 +1943,7 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr 
**psa, struct uio *uio,
        return (0);
 }
*** 428 LINES SKIPPED ***


Reply via email to