This is an automated email from the ASF dual-hosted git repository.

dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 68b6287e68 Add support for recvmmsg and UDP GRO (#9905)
68b6287e68 is described below

commit 68b6287e68f9322e78374f93876693691645ea99
Author: Damian Meden <[email protected]>
AuthorDate: Fri Jul 7 11:33:10 2023 +0100

    Add support for recvmmsg and UDP GRO (#9905)
    
    This PR adds support for UDP GRO when reading the socket using recvm(m)sg, 
also includes support for recvmmsg when it is available by the OS.
    
    A new record config is introduced for the GRO feature => 
proxy.config.udp.enable_gro
---
 configure.ac                              |   2 +-
 doc/admin-guide/files/records.yaml.en.rst |   5 +
 iocore/eventsystem/I_SocketManager.h      |   3 +
 iocore/eventsystem/P_UnixSocketManager.h  |  15 +
 iocore/net/I_UDPPacket.h                  |   2 +-
 iocore/net/P_UDPNet.h                     |  17 +-
 iocore/net/QUICPacketHandler_quiche.cc    |   2 +-
 iocore/net/UnixUDPNet.cc                  | 441 ++++++++++++++++++++++++------
 src/records/RecordsConfig.cc              |   3 +-
 9 files changed, 399 insertions(+), 91 deletions(-)

diff --git a/configure.ac b/configure.ac
index 555a8a7baa..725ad218d7 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1390,7 +1390,7 @@ AC_CHECK_FUNCS([clock_gettime kqueue epoll_ctl 
posix_fadvise posix_madvise posix
 AC_CHECK_FUNCS([port_create strlcpy strlcat sysconf sysctlbyname getpagesize])
 AC_CHECK_FUNCS([getreuid getresuid getresgid setreuid setresuid getpeereid 
getpeerucred])
 AC_CHECK_FUNCS([strsignal psignal psiginfo accept4])
-AC_CHECK_FUNCS([sendmmsg])
+AC_CHECK_FUNCS([sendmmsg recvmmsg])
 
 # Check for eventfd() and sys/eventfd.h (both must exist ...)
 AC_CHECK_HEADERS([sys/eventfd.h], [
diff --git a/doc/admin-guide/files/records.yaml.en.rst 
b/doc/admin-guide/files/records.yaml.en.rst
index ed5901c4d1..e71f9d0e7f 100644
--- a/doc/admin-guide/files/records.yaml.en.rst
+++ b/doc/admin-guide/files/records.yaml.en.rst
@@ -4877,6 +4877,11 @@ UDP Configuration
    and disables it automatically if it causes send errors.
 
 
+.. ts:cv:: CONFIG proxy.config.udp.enable_gro INT 1
+
+   Enables (``1``) or disables (``0``) UDP GRO. When enabled, |TS| will try to 
use it
+   when reading the UDP socket.
+
 Plug-in Configuration
 =====================
 
diff --git a/iocore/eventsystem/I_SocketManager.h 
b/iocore/eventsystem/I_SocketManager.h
index 591bd2ac23..a4775b9866 100644
--- a/iocore/eventsystem/I_SocketManager.h
+++ b/iocore/eventsystem/I_SocketManager.h
@@ -82,6 +82,9 @@ int64_t pwrite(int fd, void *buf, int len, off_t offset, char 
*tag = nullptr);
 int send(int fd, void *buf, int len, int flags);
 int sendto(int fd, void *buf, int len, int flags, struct sockaddr const *to, 
int tolen);
 int sendmsg(int fd, struct msghdr *m, int flags, void *pOLP = nullptr);
+#ifdef HAVE_RECVMMSG
+int recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, struct 
timespec *timeout, void *pOLP = nullptr);
+#endif
 int64_t lseek(int fd, off_t offset, int whence);
 int fsync(int fildes);
 int poll(struct pollfd *fds, unsigned long nfds, int timeout);
diff --git a/iocore/eventsystem/P_UnixSocketManager.h 
b/iocore/eventsystem/P_UnixSocketManager.h
index 177a3704f8..bbc8e5c546 100644
--- a/iocore/eventsystem/P_UnixSocketManager.h
+++ b/iocore/eventsystem/P_UnixSocketManager.h
@@ -119,6 +119,21 @@ SocketManager::recvmsg(int fd, struct msghdr *m, int 
flags, void * /* pOLP ATS_U
   return r;
 }
 
+#ifdef HAVE_RECVMMSG
+TS_INLINE int
+SocketManager::recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, 
struct timespec *timeout, void * /* pOLP ATS_UNUSED */)
+{
+  int r;
+  do {
+    if (unlikely((r = ::recvmmsg(fd, msgvec, vlen, flags, timeout)) < 0)) {
+      r = -errno;
+      // EINVAL can ocur if timeout is invalid.
+    }
+  } while (r == -EINTR);
+  return r;
+}
+#endif
+
 TS_INLINE int64_t
 SocketManager::write(int fd, void *buf, int size, void * /* pOLP ATS_UNUSED */)
 {
diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index 4d88eb1242..3166238c66 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -110,7 +110,7 @@ public:
      Create a new packet to be delivered to application.
      Internal function only
   */
-  static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct 
sockaddr *to, Ptr<IOBufferBlock> &block);
+  static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct 
sockaddr *to, Ptr<IOBufferBlock> block);
 
 private:
   SLINK(UDPPacket, alink); // atomic link
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index 796e07a642..001d307b83 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -54,6 +54,10 @@ public:
 
   off_t pollCont_offset;
   off_t udpNetHandler_offset;
+
+private:
+  void read_single_message_from_net(UDPNetHandler *nh, UDPConnection *uc);
+  void read_multiple_messages_from_net(UDPNetHandler *nh, UDPConnection *xuc);
 };
 
 extern UDPNetProcessorInternal udpNetInternal;
@@ -310,6 +314,11 @@ void initialize_thread_for_udp_net(EThread *thread);
 class UDPNetHandler : public Continuation, public EThread::LoopTailHandler
 {
 public:
+  struct Cfg {
+    // Segmentation offload.
+    bool enable_gso{true};
+    bool enable_gro{true};
+  };
   // engine for outgoing packets
   UDPQueue udpOutQueue;
 
@@ -333,7 +342,13 @@ public:
   int waitForActivity(ink_hrtime timeout) override;
   void signalActivity() override;
 
-  UDPNetHandler(bool enable_gso);
+  UDPNetHandler(Cfg &&cfg);
+
+  // GRO
+  bool is_gro_enabled() const;
+
+private:
+  Cfg _cfg; // Note: may not be the best place to put this, but for now is ok.
 };
 
 static inline PollCont *
diff --git a/iocore/net/QUICPacketHandler_quiche.cc 
b/iocore/net/QUICPacketHandler_quiche.cc
index 10f6223d9e..b09b349464 100644
--- a/iocore/net/QUICPacketHandler_quiche.cc
+++ b/iocore/net/QUICPacketHandler_quiche.cc
@@ -207,7 +207,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket 
*udp_packet)
   int rc = quiche_header_info(buf, buf_len, QUICConnectionId::SCID_LEN, 
&version, &type, scid, &scid_len, dcid, &dcid_len, token,
                               &token_len);
   if (rc < 0) {
-    QUICDebug("Ignore packet - failed to parse header");
+    QUICDebug("Ignore packet - failed to parse header: '%d'", rc);
     udp_packet->free();
     return;
   }
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index 6ecfdd3ac2..97031ad81a 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -37,20 +37,32 @@
 #include "P_DNSConnection.h"
 #include "P_Net.h"
 #include "P_UDPNet.h"
+#include "tscore/ink_inet.h"
 
 #include "tscore/ink_sock.h"
+#include <netinet/udp.h>
 
-#include "netinet/udp.h"
 #ifndef UDP_SEGMENT
 // This is needed because old glibc may not have the constant even if Kernel 
supports it.
 #define UDP_SEGMENT 103
 #endif
 
+#ifndef UDP_GRO
+#define UDP_GRO 104
+#endif
+
 using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
 
 ClassAllocator<UDPPacket> udpPacketAllocator("udpPacketAllocator");
 EventType ET_UDP;
 
+namespace
+{
+#ifdef HAVE_RECVMMSG
+const uint32_t MAX_RECEIVE_MSG_PER_CALL{16}; //< VLEN parameter for the 
recvmmsg call.
+#endif
+}; // namespace
+
 UDPPacket *
 UDPPacket::new_UDPPacket()
 {
@@ -73,7 +85,7 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to, 
ink_hrtime when, Ptr<IOBuffe
 }
 
 UDPPacket *
-UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, 
Ptr<IOBufferBlock> &block)
+UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, 
Ptr<IOBufferBlock> block)
 {
   UDPPacket *p = udpPacketAllocator.alloc();
 
@@ -150,8 +162,6 @@ UDPPacket::get_entire_chain_buffer(size_t *buf_len)
   if (this->_payload == nullptr) {
     IOBufferBlock *block = this->getIOBlockChain();
 
-    // No need to allocate, we will use the first slice. With the current 
slice size
-    // 2048 more likely we will using this block most of the time.
     if (block && block->next.get() == nullptr) {
       *buf_len = this->getPktLength();
       return reinterpret_cast<uint8_t *>(block->buf());
@@ -183,6 +193,12 @@ int32_t g_udp_periodicCleanupSlots;
 int32_t g_udp_periodicFreeCancelledPkts;
 int32_t g_udp_numSendRetries;
 
+namespace
+{
+// We'd need to set some flags in case some of the config is enabled. So we 
have
+// this object as global.
+UDPNetHandler::Cfg G_udp_config;
+} // namespace
 //
 // Public functions
 // See header for documentation
@@ -193,12 +209,23 @@ sockaddr_in6 G_bwGrapherLoc;
 void
 initialize_thread_for_udp_net(EThread *thread)
 {
-  int enable_gso;
+  int enable_gso, enable_gro;
   REC_ReadConfigInteger(enable_gso, "proxy.config.udp.enable_gso");
+  REC_ReadConfigInteger(enable_gro, "proxy.config.udp.enable_gro");
 
   UDPNetHandler *nh = get_UDPNetHandler(thread);
 
-  new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(enable_gso);
+  UDPNetHandler::Cfg cfg{static_cast<bool>(enable_gso), 
static_cast<bool>(enable_gro)};
+
+  G_udp_config = cfg; // keep a global copy.
+  new (reinterpret_cast<ink_dummy_for_new *>(nh)) 
UDPNetHandler(std::move(cfg));
+
+#ifndef SOL_UDP
+  if (G_udp_config.enable_gro) {
+    Warning("Attempted to use UDP GRO per configuration, but it is 
unavailable");
+  }
+#endif
+
   new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) 
PollCont(thread->mutex);
   // The UDPNetHandler cannot be accessed across EThreads.
   // Because the UDPNetHandler should be called back immediately after 
UDPPollCont.
@@ -259,8 +286,74 @@ UDPNetProcessorInternal::start(int n_upd_threads, size_t 
stacksize)
   return 0;
 }
 
+namespace
+{
+bool
+get_ip_address_from_cmsg(struct cmsghdr *cmsg, sockaddr_in6 *toaddr)
+{
+#ifdef IP_PKTINFO
+  if (IP_PKTINFO == cmsg->cmsg_type) {
+    if (cmsg->cmsg_level == IPPROTO_IP) {
+      struct in_pktinfo *pktinfo                               = 
reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
+      reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr = 
pktinfo->ipi_addr.s_addr;
+    }
+    return true;
+  }
+#endif
+#ifdef IP_RECVDSTADDR
+  if (IP_RECVDSTADDR == cmsg->cmsg_type) {
+    if (cmsg->cmsg_level == IPPROTO_IP) {
+      struct in_addr *addr                                     = 
reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
+      reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr = addr->s_addr;
+    }
+    return true;
+  }
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+  if (IPV6_PKTINFO == cmsg->cmsg_type) { // IPV6_RECVPKTINFO uses IPV6_PKTINFO 
too
+    if (cmsg->cmsg_level == IPPROTO_IPV6) {
+      struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo 
*>(CMSG_DATA(cmsg));
+      memcpy(toaddr->sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
+    }
+    return true;
+  }
+#endif
+  return false;
+}
+
+unsigned int
+build_iovec_block_chain(unsigned max_niov, int64_t size_index, 
Ptr<IOBufferBlock> &chain, struct iovec *out_tiovec)
+{
+  unsigned int niov;
+  IOBufferBlock *b, *last;
+
+  // build struct iov
+  // reuse the block in chain if available
+  b    = chain.get();
+  last = nullptr;
+  for (niov = 0; niov < max_niov; niov++) {
+    if (b == nullptr) {
+      b = new_IOBufferBlock();
+      b->alloc(size_index);
+      if (last == nullptr) {
+        chain = b;
+      } else {
+        last->next = b;
+      }
+    }
+
+    out_tiovec[niov].iov_base = b->buf();
+    out_tiovec[niov].iov_len  = b->block_size();
+
+    last = b;
+    b    = b->next.get();
+  }
+  return niov;
+}
+} // namespace
+
 void
-UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection 
*xuc)
+UDPNetProcessorInternal::read_single_message_from_net(UDPNetHandler *nh, 
UDPConnection *xuc)
 {
   UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
 
@@ -269,7 +362,7 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler 
*nh, UDPConnection *xuc
   int64_t r;
   int iters         = 0;
   unsigned max_niov = 32;
-
+  int64_t gso_size{0}; // in case is available.
   struct msghdr msg;
   Ptr<IOBufferBlock> chain, next_chain;
   struct iovec tiovec[max_niov];
@@ -280,42 +373,37 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler 
*nh, UDPConnection *xuc
   // And there is 8 octets in 'User Datagram Header' which means the max 
length of payload is no more than 65527 bytes.
   do {
     // create IOBufferBlock chain to receive data
-    unsigned int niov;
-    IOBufferBlock *b, *last;
-
-    // build struct iov
-    // reuse the block in chain if available
-    b    = chain.get();
-    last = nullptr;
-    for (niov = 0; niov < max_niov; niov++) {
-      if (b == nullptr) {
-        b = new_IOBufferBlock();
-        b->alloc(size_index);
-        if (last == nullptr) {
-          chain = b;
-        } else {
-          last->next = b;
-        }
-      }
-
-      tiovec[niov].iov_base = b->buf();
-      tiovec[niov].iov_len  = b->block_size();
-
-      last = b;
-      b    = b->next.get();
-    }
+    unsigned int niov = build_iovec_block_chain(max_niov, size_index, chain, 
tiovec);
 
     // build struct msghdr
     sockaddr_in6 fromaddr;
     sockaddr_in6 toaddr;
-    int toaddr_len = sizeof(toaddr);
-    char *cbuf[1024];
-    msg.msg_name       = &fromaddr;
-    msg.msg_namelen    = sizeof(fromaddr);
-    msg.msg_iov        = tiovec;
-    msg.msg_iovlen     = niov;
-    msg.msg_control    = cbuf;
-    msg.msg_controllen = sizeof(cbuf);
+    int toaddr_len  = sizeof(toaddr);
+    msg.msg_name    = &fromaddr;
+    msg.msg_namelen = sizeof(fromaddr);
+    msg.msg_iov     = tiovec;
+    msg.msg_iovlen  = niov;
+
+    static const size_t cmsg_size
+    {
+      CMSG_SPACE(sizeof(int))
+#ifdef IP_PKTINFO
+      +CMSG_SPACE(sizeof(struct in_pktinfo))
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+        + CMSG_SPACE(sizeof(struct in6_pktinfo))
+#endif
+#ifdef IP_RECVDSTADDR
+        + CMSG_SPACE(sizeof(struct in_addr))
+#endif
+#ifdef UDP_GRO
+        + CMSG_SPACE(sizeof(uint16_t))
+#endif
+    };
+
+    char control[cmsg_size] = {0};
+    msg.msg_control         = control;
+    msg.msg_controllen      = cmsg_size;
 
     // receive data by recvmsg
     r = SocketManager::recvmsg(uc->getFd(), &msg, 0);
@@ -329,61 +417,56 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler 
*nh, UDPConnection *xuc
       Debug("udp-read", "The UDP packet is truncated");
     }
 
-    // fill the IOBufferBlock chain
-    int64_t saved = r;
-    b             = chain.get();
-    while (b && saved > 0) {
-      if (saved > buffer_size) {
-        b->fill(buffer_size);
-        saved -= buffer_size;
-        b      = b->next.get();
-      } else {
-        b->fill(saved);
-        saved      = 0;
-        next_chain = b->next.get();
-        b->next    = nullptr;
-      }
-    }
-
     safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr 
*>(&toaddr), &toaddr_len);
     for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = 
CMSG_NXTHDR(&msg, cmsg)) {
-      switch (cmsg->cmsg_type) {
-#ifdef IP_PKTINFO
-      case IP_PKTINFO:
-        if (cmsg->cmsg_level == IPPROTO_IP) {
-          struct in_pktinfo *pktinfo                                = 
reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
-          reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = 
pktinfo->ipi_addr.s_addr;
-        }
+      if (get_ip_address_from_cmsg(cmsg, &toaddr)) {
         break;
-#endif
-#ifdef IP_RECVDSTADDR
-      case IP_RECVDSTADDR:
-        if (cmsg->cmsg_level == IPPROTO_IP) {
-          struct in_addr *addr                                      = 
reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
-          reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = 
addr->s_addr;
+      }
+#ifdef SOL_UDP
+      if (UDP_GRO == cmsg->cmsg_type) {
+        if (nh->is_gro_enabled()) {
+          gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg));
         }
         break;
+      }
 #endif
-#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
-      case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too
-        if (cmsg->cmsg_level == IPPROTO_IPV6) {
-          struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo 
*>(CMSG_DATA(cmsg));
-          memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
+    }
+
+    // If gro was used, then the kernel will tell us the size of each part 
that was spliced together.
+    Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)", 
static_cast<long long>(r), static_cast<long long>(gso_size),
+          (gso_size > 0 ? "GRO" : "No GRO"));
+
+    IOBufferBlock *block;
+    int64_t remaining{r};
+    while (remaining > 0) {
+      block                 = chain.get();
+      int64_t this_packet_r = gso_size ? std::min(gso_size, r) : r;
+      while (block && this_packet_r > 0) {
+        if (this_packet_r > buffer_size) {
+          block->fill(buffer_size);
+          this_packet_r -= buffer_size;
+          block          = block->next.get();
+          remaining     -= buffer_size;
+        } else {
+          block->fill(this_packet_r);
+          remaining     -= this_packet_r;
+          this_packet_r  = 0;
+          next_chain     = block->next.get();
+          block->next    = nullptr;
         }
-        break;
-#endif
       }
+      Debug("udp-read", "Creating packet");
+      // create packet
+      UDPPacket *p = 
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), 
ats_ip_sa_cast(&toaddr), chain);
+      p->setConnection(uc);
+      // queue onto the UDPConnection
+      uc->inQueue.push(p);
+
+      // reload the unused block
+      chain      = next_chain;
+      next_chain = nullptr;
     }
 
-    // create packet
-    UDPPacket *p = 
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), 
ats_ip_sa_cast(&toaddr), chain);
-    p->setConnection(uc);
-    // queue onto the UDPConnection
-    uc->inQueue.push(p);
-
-    // reload the unused block
-    chain      = next_chain;
-    next_chain = nullptr;
     iters++;
   } while (r > 0);
   if (iters >= 1) {
@@ -399,6 +482,170 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler 
*nh, UDPConnection *xuc
   }
 }
 
+#ifdef HAVE_RECVMMSG
+void
+UDPNetProcessorInternal::read_multiple_messages_from_net(UDPNetHandler *nh, 
UDPConnection *xuc)
+{
+  UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
+
+  std::array<Ptr<IOBufferBlock>, MAX_RECEIVE_MSG_PER_CALL> buffer_chain;
+  unsigned max_niov = 32;
+  int64_t gso_size{0}; // In case is available
+
+  struct mmsghdr mmsg[MAX_RECEIVE_MSG_PER_CALL];
+  struct iovec tiovec[MAX_RECEIVE_MSG_PER_CALL][max_niov];
+
+  // Addresses
+  sockaddr_in6 fromaddr[MAX_RECEIVE_MSG_PER_CALL];
+  sockaddr_in6 toaddr[MAX_RECEIVE_MSG_PER_CALL];
+  int toaddr_len = sizeof(toaddr);
+
+  size_t total_bytes_read{0};
+
+  static const size_t cmsg_size
+  {
+    CMSG_SPACE(sizeof(int))
+#ifdef IP_PKTINFO
+    +CMSG_SPACE(sizeof(struct in_pktinfo))
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+      + CMSG_SPACE(sizeof(struct in6_pktinfo))
+#endif
+#ifdef IP_RECVDSTADDR
+      + CMSG_SPACE(sizeof(struct in_addr))
+#endif
+#ifdef UDP_GRO
+      + CMSG_SPACE(sizeof(uint16_t))
+#endif
+  };
+
+  // Ancillary data.
+  char control[MAX_RECEIVE_MSG_PER_CALL][cmsg_size];
+
+  int64_t size_index  = BUFFER_SIZE_INDEX_2K;
+  int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index);
+  // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes.
+  // Because the 'UDP Length' is type of uint16_t defined in RFC 768.
+  // And there is 8 octets in 'User Datagram Header' which means the max 
length of payload is no more than 65527 bytes.
+
+  for (uint32_t msg_num = 0; msg_num < MAX_RECEIVE_MSG_PER_CALL; msg_num++) {
+    // build each block chain.
+    unsigned int niov = build_iovec_block_chain(max_niov, size_index, 
buffer_chain[msg_num], tiovec[msg_num]);
+
+    mmsg[msg_num].msg_hdr.msg_iov     = tiovec[msg_num];
+    mmsg[msg_num].msg_hdr.msg_iovlen  = niov;
+    mmsg[msg_num].msg_hdr.msg_name    = &fromaddr[msg_num];
+    mmsg[msg_num].msg_hdr.msg_namelen = sizeof(fromaddr[msg_num]);
+    memset(control[msg_num], 0, cmsg_size);
+    mmsg[msg_num].msg_hdr.msg_control    = control[msg_num];
+    mmsg[msg_num].msg_hdr.msg_controllen = cmsg_size;
+  }
+
+  const int return_val = SocketManager::recvmmsg(uc->getFd(), mmsg, 
MAX_RECEIVE_MSG_PER_CALL, MSG_WAITFORONE, nullptr);
+
+  if (return_val <= 0) {
+    Debug("udp-read", "Done. recvmmsg() ret is %d, errno %s", return_val, 
strerror(errno));
+    return;
+  }
+  Debug("udp-read", "recvmmsg() read %d packets", return_val);
+
+  Ptr<IOBufferBlock> chain, next_chain;
+  for (auto packet_num = 0; packet_num < return_val; packet_num++) {
+    gso_size = 0;
+
+    Debug("udp-read", "Processing message %d from a total of %d", packet_num, 
return_val);
+    struct msghdr &mhdr = mmsg[packet_num].msg_hdr;
+
+    if (mhdr.msg_flags & MSG_TRUNC) {
+      Debug("udp-read", "The UDP packet is truncated");
+      break;
+    }
+
+    if (mhdr.msg_namelen <= 0) {
+      Debug("udp-read", "Unable to get remote address from recvmmsg() for fd: 
%d", uc->getFd());
+      return;
+    }
+
+    safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr 
*>(&toaddr[packet_num]), &toaddr_len);
+    if (mhdr.msg_controllen > 0) {
+      for (auto cmsg = CMSG_FIRSTHDR(&mhdr); cmsg != nullptr; cmsg = 
CMSG_NXTHDR(&mhdr, cmsg)) {
+        if (get_ip_address_from_cmsg(cmsg, &toaddr[packet_num])) {
+          break;
+        }
+#ifdef SOL_UDP
+        if (UDP_GRO == cmsg->cmsg_type) {
+          if (nh->is_gro_enabled()) {
+            gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg));
+          }
+          break;
+        }
+#endif
+      }
+    }
+
+    const int64_t received  = mmsg[packet_num].msg_len;
+    total_bytes_read       += received;
+
+    // If gro was used, then the kernel will tell us the size of each part 
that was spliced together.
+    Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)", 
static_cast<long long>(received), static_cast<long long>(gso_size),
+          (gso_size > 0 ? "GRO" : "No GRO"));
+
+    auto chain = buffer_chain[packet_num];
+    IOBufferBlock *block;
+    int64_t remaining{received};
+    while (remaining > 0) {
+      block                 = chain.get();
+      int64_t this_packet_r = gso_size ? std::min(gso_size, received) : 
received;
+      while (block && this_packet_r > 0) {
+        if (this_packet_r > buffer_size) {
+          block->fill(buffer_size);
+          this_packet_r -= buffer_size;
+          block          = block->next.get();
+          remaining     -= buffer_size;
+        } else {
+          block->fill(this_packet_r);
+          remaining     -= this_packet_r;
+          this_packet_r  = 0;
+          next_chain     = block->next.get();
+          block->next    = nullptr;
+        }
+      }
+      Debug("udp-read", "Creating packet");
+      // create packet
+      UDPPacket *p =
+        
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr[packet_num]), 
ats_ip_sa_cast(&toaddr[packet_num]), chain);
+      p->setConnection(uc);
+      // queue onto the UDPConnection
+      uc->inQueue.push(p);
+
+      // reload the unused block
+      chain      = next_chain;
+      next_chain = nullptr;
+    }
+  }
+  Debug("udp-read", "Total bytes read %ld from %d packets.", total_bytes_read, 
return_val);
+
+  // if not already on to-be-called-back queue, then add it.
+  if (!uc->onCallbackQueue) {
+    ink_assert(uc->callback_link.next == nullptr);
+    ink_assert(uc->callback_link.prev == nullptr);
+    uc->AddRef();
+    nh->udp_callbacks.enqueue(uc);
+    uc->onCallbackQueue = 1;
+  }
+}
+#endif
+
+void
+UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection 
*xuc)
+{
+#if HAVE_RECVMMSG
+  read_multiple_messages_from_net(nh, xuc);
+#else
+  read_single_message_from_net(nh, xuc);
+#endif
+}
+
 int
 UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, 
EThread *thread)
 {
@@ -929,6 +1176,17 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr 
const *addr, int fd, int s
     }
   }
 
+#ifdef SOL_UDP
+  if (G_udp_config.enable_gro) {
+    int gro = 1;
+    if (safe_setsockopt(fd, IPPROTO_UDP, UDP_GRO, (char *)&gro, sizeof(gro)) 
== 0) {
+      Debug("udpnet", "setsockopt UDP_GRO ok");
+    } else {
+      Debug("udpnet", "setsockopt UDP_GRO. errno=%d", errno);
+    }
+  }
+#endif
+
   // If this is a class D address (i.e. multicast address), use REUSEADDR.
   if (ats_is_ip_multicast(addr)) {
     if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) {
@@ -957,6 +1215,7 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr 
const *addr, int fd, int s
     goto Lerror;
   }
 
+  // check this for GRO
   if (recv_bufsize) {
     if (unlikely(SocketManager::set_rcvbuf_size(fd, recv_bufsize))) {
       Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
@@ -1449,13 +1708,23 @@ net_signal_hook_callback(EThread *thread)
 #endif
 }
 
-UDPNetHandler::UDPNetHandler(bool enable_gso) : udpOutQueue(enable_gso)
+UDPNetHandler::UDPNetHandler(Cfg &&cfg) : udpOutQueue(cfg.enable_gso), 
_cfg{std::move(cfg)}
 {
   nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
   lastCheck = 0;
   SET_HANDLER(&UDPNetHandler::startNetEvent);
 }
 
+bool
+UDPNetHandler::is_gro_enabled() const
+{
+#ifndef SOL_UDP
+  return false;
+#else
+  return this->_cfg.enable_gro;
+#endif
+}
+
 int
 UDPNetHandler::startNetEvent(int event, Event *e)
 {
diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc
index 65fd3e719c..c827cb4be1 100644
--- a/src/records/RecordsConfig.cc
+++ b/src/records/RecordsConfig.cc
@@ -260,7 +260,8 @@ static const RecordElement RecordsConfig[] =
   ,
   {RECT_CONFIG, "proxy.config.udp.enable_gso", RECD_INT, "1", RECU_NULL, 
RR_NULL, RECC_NULL, nullptr, RECA_NULL}
   ,
-
+  {RECT_CONFIG, "proxy.config.udp.enable_gro", RECD_INT, "1", RECU_NULL, 
RR_NULL, RECC_NULL, nullptr, RECA_NULL}
+  ,
   
//##############################################################################
   //#
   //# Alarm Configuration

Reply via email to