This is an automated email from the ASF dual-hosted git repository. zwoop pushed a commit to branch NewAPIMetricsPOC in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit fc69ba72e704adf4bfe0a670abce5ac9632fff99 Author: Damian Meden <[email protected]> AuthorDate: Fri Jul 7 11:33:10 2023 +0100 Add support for recvmmsg and UDP GRO (#9905) This PR adds support for UDP GRO when reading the socket using recvm(m)sg, also includes support for recvmmsg when it is available by the OS. A new record config is introduced for the GRO feature => proxy.config.udp.enable_gro --- configure.ac | 2 +- doc/admin-guide/files/records.yaml.en.rst | 5 + iocore/eventsystem/I_SocketManager.h | 3 + iocore/eventsystem/P_UnixSocketManager.h | 15 + iocore/net/I_UDPPacket.h | 2 +- iocore/net/P_UDPNet.h | 17 +- iocore/net/QUICPacketHandler_quiche.cc | 2 +- iocore/net/UnixUDPNet.cc | 441 ++++++++++++++++++++++++------ src/records/RecordsConfig.cc | 3 +- 9 files changed, 399 insertions(+), 91 deletions(-) diff --git a/configure.ac b/configure.ac index fd4690c137..d33f6a0edc 100644 --- a/configure.ac +++ b/configure.ac @@ -1390,7 +1390,7 @@ AC_CHECK_FUNCS([clock_gettime kqueue epoll_ctl posix_fadvise posix_madvise posix AC_CHECK_FUNCS([port_create strlcpy strlcat sysconf sysctlbyname getpagesize]) AC_CHECK_FUNCS([getreuid getresuid getresgid setreuid setresuid getpeereid getpeerucred]) AC_CHECK_FUNCS([strsignal psignal psiginfo accept4]) -AC_CHECK_FUNCS([sendmmsg]) +AC_CHECK_FUNCS([sendmmsg recvmmsg]) # Check for eventfd() and sys/eventfd.h (both must exist ...) AC_CHECK_HEADERS([sys/eventfd.h], [ diff --git a/doc/admin-guide/files/records.yaml.en.rst b/doc/admin-guide/files/records.yaml.en.rst index ed5901c4d1..e71f9d0e7f 100644 --- a/doc/admin-guide/files/records.yaml.en.rst +++ b/doc/admin-guide/files/records.yaml.en.rst @@ -4877,6 +4877,11 @@ UDP Configuration and disables it automatically if it causes send errors. +.. ts:cv:: CONFIG proxy.config.udp.enable_gro INT 1 + + Enables (``1``) or disables (``0``) UDP GRO. When enabled, |TS| will try to use it + when reading the UDP socket. + Plug-in Configuration ===================== diff --git a/iocore/eventsystem/I_SocketManager.h b/iocore/eventsystem/I_SocketManager.h index 591bd2ac23..a4775b9866 100644 --- a/iocore/eventsystem/I_SocketManager.h +++ b/iocore/eventsystem/I_SocketManager.h @@ -82,6 +82,9 @@ int64_t pwrite(int fd, void *buf, int len, off_t offset, char *tag = nullptr); int send(int fd, void *buf, int len, int flags); int sendto(int fd, void *buf, int len, int flags, struct sockaddr const *to, int tolen); int sendmsg(int fd, struct msghdr *m, int flags, void *pOLP = nullptr); +#ifdef HAVE_RECVMMSG +int recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, struct timespec *timeout, void *pOLP = nullptr); +#endif int64_t lseek(int fd, off_t offset, int whence); int fsync(int fildes); int poll(struct pollfd *fds, unsigned long nfds, int timeout); diff --git a/iocore/eventsystem/P_UnixSocketManager.h b/iocore/eventsystem/P_UnixSocketManager.h index 177a3704f8..bbc8e5c546 100644 --- a/iocore/eventsystem/P_UnixSocketManager.h +++ b/iocore/eventsystem/P_UnixSocketManager.h @@ -119,6 +119,21 @@ SocketManager::recvmsg(int fd, struct msghdr *m, int flags, void * /* pOLP ATS_U return r; } +#ifdef HAVE_RECVMMSG +TS_INLINE int +SocketManager::recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, struct timespec *timeout, void * /* pOLP ATS_UNUSED */) +{ + int r; + do { + if (unlikely((r = ::recvmmsg(fd, msgvec, vlen, flags, timeout)) < 0)) { + r = -errno; + // EINVAL can ocur if timeout is invalid. + } + } while (r == -EINTR); + return r; +} +#endif + TS_INLINE int64_t SocketManager::write(int fd, void *buf, int size, void * /* pOLP ATS_UNUSED */) { diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index 4d88eb1242..3166238c66 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -110,7 +110,7 @@ public: Create a new packet to be delivered to application. Internal function only */ - static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> &block); + static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> block); private: SLINK(UDPPacket, alink); // atomic link diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index 796e07a642..001d307b83 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -54,6 +54,10 @@ public: off_t pollCont_offset; off_t udpNetHandler_offset; + +private: + void read_single_message_from_net(UDPNetHandler *nh, UDPConnection *uc); + void read_multiple_messages_from_net(UDPNetHandler *nh, UDPConnection *xuc); }; extern UDPNetProcessorInternal udpNetInternal; @@ -310,6 +314,11 @@ void initialize_thread_for_udp_net(EThread *thread); class UDPNetHandler : public Continuation, public EThread::LoopTailHandler { public: + struct Cfg { + // Segmentation offload. + bool enable_gso{true}; + bool enable_gro{true}; + }; // engine for outgoing packets UDPQueue udpOutQueue; @@ -333,7 +342,13 @@ public: int waitForActivity(ink_hrtime timeout) override; void signalActivity() override; - UDPNetHandler(bool enable_gso); + UDPNetHandler(Cfg &&cfg); + + // GRO + bool is_gro_enabled() const; + +private: + Cfg _cfg; // Note: may not be the best place to put this, but for now is ok. }; static inline PollCont * diff --git a/iocore/net/QUICPacketHandler_quiche.cc b/iocore/net/QUICPacketHandler_quiche.cc index 10f6223d9e..b09b349464 100644 --- a/iocore/net/QUICPacketHandler_quiche.cc +++ b/iocore/net/QUICPacketHandler_quiche.cc @@ -207,7 +207,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) int rc = quiche_header_info(buf, buf_len, QUICConnectionId::SCID_LEN, &version, &type, scid, &scid_len, dcid, &dcid_len, token, &token_len); if (rc < 0) { - QUICDebug("Ignore packet - failed to parse header"); + QUICDebug("Ignore packet - failed to parse header: '%d'", rc); udp_packet->free(); return; } diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index 6ecfdd3ac2..97031ad81a 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -37,20 +37,32 @@ #include "P_DNSConnection.h" #include "P_Net.h" #include "P_UDPNet.h" +#include "tscore/ink_inet.h" #include "tscore/ink_sock.h" +#include <netinet/udp.h> -#include "netinet/udp.h" #ifndef UDP_SEGMENT // This is needed because old glibc may not have the constant even if Kernel supports it. #define UDP_SEGMENT 103 #endif +#ifndef UDP_GRO +#define UDP_GRO 104 +#endif + using UDPNetContHandler = int (UDPNetHandler::*)(int, void *); ClassAllocator<UDPPacket> udpPacketAllocator("udpPacketAllocator"); EventType ET_UDP; +namespace +{ +#ifdef HAVE_RECVMMSG +const uint32_t MAX_RECEIVE_MSG_PER_CALL{16}; //< VLEN parameter for the recvmmsg call. +#endif +}; // namespace + UDPPacket * UDPPacket::new_UDPPacket() { @@ -73,7 +85,7 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBuffe } UDPPacket * -UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> &block) +UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> block) { UDPPacket *p = udpPacketAllocator.alloc(); @@ -150,8 +162,6 @@ UDPPacket::get_entire_chain_buffer(size_t *buf_len) if (this->_payload == nullptr) { IOBufferBlock *block = this->getIOBlockChain(); - // No need to allocate, we will use the first slice. With the current slice size - // 2048 more likely we will using this block most of the time. if (block && block->next.get() == nullptr) { *buf_len = this->getPktLength(); return reinterpret_cast<uint8_t *>(block->buf()); @@ -183,6 +193,12 @@ int32_t g_udp_periodicCleanupSlots; int32_t g_udp_periodicFreeCancelledPkts; int32_t g_udp_numSendRetries; +namespace +{ +// We'd need to set some flags in case some of the config is enabled. So we have +// this object as global. +UDPNetHandler::Cfg G_udp_config; +} // namespace // // Public functions // See header for documentation @@ -193,12 +209,23 @@ sockaddr_in6 G_bwGrapherLoc; void initialize_thread_for_udp_net(EThread *thread) { - int enable_gso; + int enable_gso, enable_gro; REC_ReadConfigInteger(enable_gso, "proxy.config.udp.enable_gso"); + REC_ReadConfigInteger(enable_gro, "proxy.config.udp.enable_gro"); UDPNetHandler *nh = get_UDPNetHandler(thread); - new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(enable_gso); + UDPNetHandler::Cfg cfg{static_cast<bool>(enable_gso), static_cast<bool>(enable_gro)}; + + G_udp_config = cfg; // keep a global copy. + new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(std::move(cfg)); + +#ifndef SOL_UDP + if (G_udp_config.enable_gro) { + Warning("Attempted to use UDP GRO per configuration, but it is unavailable"); + } +#endif + new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex); // The UDPNetHandler cannot be accessed across EThreads. // Because the UDPNetHandler should be called back immediately after UDPPollCont. @@ -259,8 +286,74 @@ UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize) return 0; } +namespace +{ +bool +get_ip_address_from_cmsg(struct cmsghdr *cmsg, sockaddr_in6 *toaddr) +{ +#ifdef IP_PKTINFO + if (IP_PKTINFO == cmsg->cmsg_type) { + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_pktinfo *pktinfo = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg)); + reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; + } + return true; + } +#endif +#ifdef IP_RECVDSTADDR + if (IP_RECVDSTADDR == cmsg->cmsg_type) { + if (cmsg->cmsg_level == IPPROTO_IP) { + struct in_addr *addr = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg)); + reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr = addr->s_addr; + } + return true; + } +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + if (IPV6_PKTINFO == cmsg->cmsg_type) { // IPV6_RECVPKTINFO uses IPV6_PKTINFO too + if (cmsg->cmsg_level == IPPROTO_IPV6) { + struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg)); + memcpy(toaddr->sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + return true; + } +#endif + return false; +} + +unsigned int +build_iovec_block_chain(unsigned max_niov, int64_t size_index, Ptr<IOBufferBlock> &chain, struct iovec *out_tiovec) +{ + unsigned int niov; + IOBufferBlock *b, *last; + + // build struct iov + // reuse the block in chain if available + b = chain.get(); + last = nullptr; + for (niov = 0; niov < max_niov; niov++) { + if (b == nullptr) { + b = new_IOBufferBlock(); + b->alloc(size_index); + if (last == nullptr) { + chain = b; + } else { + last->next = b; + } + } + + out_tiovec[niov].iov_base = b->buf(); + out_tiovec[niov].iov_len = b->block_size(); + + last = b; + b = b->next.get(); + } + return niov; +} +} // namespace + void -UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc) +UDPNetProcessorInternal::read_single_message_from_net(UDPNetHandler *nh, UDPConnection *xuc) { UnixUDPConnection *uc = (UnixUDPConnection *)xuc; @@ -269,7 +362,7 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc int64_t r; int iters = 0; unsigned max_niov = 32; - + int64_t gso_size{0}; // in case is available. struct msghdr msg; Ptr<IOBufferBlock> chain, next_chain; struct iovec tiovec[max_niov]; @@ -280,42 +373,37 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. do { // create IOBufferBlock chain to receive data - unsigned int niov; - IOBufferBlock *b, *last; - - // build struct iov - // reuse the block in chain if available - b = chain.get(); - last = nullptr; - for (niov = 0; niov < max_niov; niov++) { - if (b == nullptr) { - b = new_IOBufferBlock(); - b->alloc(size_index); - if (last == nullptr) { - chain = b; - } else { - last->next = b; - } - } - - tiovec[niov].iov_base = b->buf(); - tiovec[niov].iov_len = b->block_size(); - - last = b; - b = b->next.get(); - } + unsigned int niov = build_iovec_block_chain(max_niov, size_index, chain, tiovec); // build struct msghdr sockaddr_in6 fromaddr; sockaddr_in6 toaddr; - int toaddr_len = sizeof(toaddr); - char *cbuf[1024]; - msg.msg_name = &fromaddr; - msg.msg_namelen = sizeof(fromaddr); - msg.msg_iov = tiovec; - msg.msg_iovlen = niov; - msg.msg_control = cbuf; - msg.msg_controllen = sizeof(cbuf); + int toaddr_len = sizeof(toaddr); + msg.msg_name = &fromaddr; + msg.msg_namelen = sizeof(fromaddr); + msg.msg_iov = tiovec; + msg.msg_iovlen = niov; + + static const size_t cmsg_size + { + CMSG_SPACE(sizeof(int)) +#ifdef IP_PKTINFO + +CMSG_SPACE(sizeof(struct in_pktinfo)) +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + + CMSG_SPACE(sizeof(struct in6_pktinfo)) +#endif +#ifdef IP_RECVDSTADDR + + CMSG_SPACE(sizeof(struct in_addr)) +#endif +#ifdef UDP_GRO + + CMSG_SPACE(sizeof(uint16_t)) +#endif + }; + + char control[cmsg_size] = {0}; + msg.msg_control = control; + msg.msg_controllen = cmsg_size; // receive data by recvmsg r = SocketManager::recvmsg(uc->getFd(), &msg, 0); @@ -329,61 +417,56 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc Debug("udp-read", "The UDP packet is truncated"); } - // fill the IOBufferBlock chain - int64_t saved = r; - b = chain.get(); - while (b && saved > 0) { - if (saved > buffer_size) { - b->fill(buffer_size); - saved -= buffer_size; - b = b->next.get(); - } else { - b->fill(saved); - saved = 0; - next_chain = b->next.get(); - b->next = nullptr; - } - } - safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr), &toaddr_len); for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - switch (cmsg->cmsg_type) { -#ifdef IP_PKTINFO - case IP_PKTINFO: - if (cmsg->cmsg_level == IPPROTO_IP) { - struct in_pktinfo *pktinfo = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg)); - reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; - } + if (get_ip_address_from_cmsg(cmsg, &toaddr)) { break; -#endif -#ifdef IP_RECVDSTADDR - case IP_RECVDSTADDR: - if (cmsg->cmsg_level == IPPROTO_IP) { - struct in_addr *addr = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg)); - reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = addr->s_addr; + } +#ifdef SOL_UDP + if (UDP_GRO == cmsg->cmsg_type) { + if (nh->is_gro_enabled()) { + gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg)); } break; + } #endif -#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) - case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too - if (cmsg->cmsg_level == IPPROTO_IPV6) { - struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg)); - memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); + } + + // If gro was used, then the kernel will tell us the size of each part that was spliced together. + Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)", static_cast<long long>(r), static_cast<long long>(gso_size), + (gso_size > 0 ? "GRO" : "No GRO")); + + IOBufferBlock *block; + int64_t remaining{r}; + while (remaining > 0) { + block = chain.get(); + int64_t this_packet_r = gso_size ? std::min(gso_size, r) : r; + while (block && this_packet_r > 0) { + if (this_packet_r > buffer_size) { + block->fill(buffer_size); + this_packet_r -= buffer_size; + block = block->next.get(); + remaining -= buffer_size; + } else { + block->fill(this_packet_r); + remaining -= this_packet_r; + this_packet_r = 0; + next_chain = block->next.get(); + block->next = nullptr; } - break; -#endif } + Debug("udp-read", "Creating packet"); + // create packet + UDPPacket *p = UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain); + p->setConnection(uc); + // queue onto the UDPConnection + uc->inQueue.push(p); + + // reload the unused block + chain = next_chain; + next_chain = nullptr; } - // create packet - UDPPacket *p = UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain); - p->setConnection(uc); - // queue onto the UDPConnection - uc->inQueue.push(p); - - // reload the unused block - chain = next_chain; - next_chain = nullptr; iters++; } while (r > 0); if (iters >= 1) { @@ -399,6 +482,170 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc } } +#ifdef HAVE_RECVMMSG +void +UDPNetProcessorInternal::read_multiple_messages_from_net(UDPNetHandler *nh, UDPConnection *xuc) +{ + UnixUDPConnection *uc = (UnixUDPConnection *)xuc; + + std::array<Ptr<IOBufferBlock>, MAX_RECEIVE_MSG_PER_CALL> buffer_chain; + unsigned max_niov = 32; + int64_t gso_size{0}; // In case is available + + struct mmsghdr mmsg[MAX_RECEIVE_MSG_PER_CALL]; + struct iovec tiovec[MAX_RECEIVE_MSG_PER_CALL][max_niov]; + + // Addresses + sockaddr_in6 fromaddr[MAX_RECEIVE_MSG_PER_CALL]; + sockaddr_in6 toaddr[MAX_RECEIVE_MSG_PER_CALL]; + int toaddr_len = sizeof(toaddr); + + size_t total_bytes_read{0}; + + static const size_t cmsg_size + { + CMSG_SPACE(sizeof(int)) +#ifdef IP_PKTINFO + +CMSG_SPACE(sizeof(struct in_pktinfo)) +#endif +#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) + + CMSG_SPACE(sizeof(struct in6_pktinfo)) +#endif +#ifdef IP_RECVDSTADDR + + CMSG_SPACE(sizeof(struct in_addr)) +#endif +#ifdef UDP_GRO + + CMSG_SPACE(sizeof(uint16_t)) +#endif + }; + + // Ancillary data. + char control[MAX_RECEIVE_MSG_PER_CALL][cmsg_size]; + + int64_t size_index = BUFFER_SIZE_INDEX_2K; + int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index); + // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes. + // Because the 'UDP Length' is type of uint16_t defined in RFC 768. + // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. + + for (uint32_t msg_num = 0; msg_num < MAX_RECEIVE_MSG_PER_CALL; msg_num++) { + // build each block chain. + unsigned int niov = build_iovec_block_chain(max_niov, size_index, buffer_chain[msg_num], tiovec[msg_num]); + + mmsg[msg_num].msg_hdr.msg_iov = tiovec[msg_num]; + mmsg[msg_num].msg_hdr.msg_iovlen = niov; + mmsg[msg_num].msg_hdr.msg_name = &fromaddr[msg_num]; + mmsg[msg_num].msg_hdr.msg_namelen = sizeof(fromaddr[msg_num]); + memset(control[msg_num], 0, cmsg_size); + mmsg[msg_num].msg_hdr.msg_control = control[msg_num]; + mmsg[msg_num].msg_hdr.msg_controllen = cmsg_size; + } + + const int return_val = SocketManager::recvmmsg(uc->getFd(), mmsg, MAX_RECEIVE_MSG_PER_CALL, MSG_WAITFORONE, nullptr); + + if (return_val <= 0) { + Debug("udp-read", "Done. recvmmsg() ret is %d, errno %s", return_val, strerror(errno)); + return; + } + Debug("udp-read", "recvmmsg() read %d packets", return_val); + + Ptr<IOBufferBlock> chain, next_chain; + for (auto packet_num = 0; packet_num < return_val; packet_num++) { + gso_size = 0; + + Debug("udp-read", "Processing message %d from a total of %d", packet_num, return_val); + struct msghdr &mhdr = mmsg[packet_num].msg_hdr; + + if (mhdr.msg_flags & MSG_TRUNC) { + Debug("udp-read", "The UDP packet is truncated"); + break; + } + + if (mhdr.msg_namelen <= 0) { + Debug("udp-read", "Unable to get remote address from recvmmsg() for fd: %d", uc->getFd()); + return; + } + + safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr[packet_num]), &toaddr_len); + if (mhdr.msg_controllen > 0) { + for (auto cmsg = CMSG_FIRSTHDR(&mhdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&mhdr, cmsg)) { + if (get_ip_address_from_cmsg(cmsg, &toaddr[packet_num])) { + break; + } +#ifdef SOL_UDP + if (UDP_GRO == cmsg->cmsg_type) { + if (nh->is_gro_enabled()) { + gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg)); + } + break; + } +#endif + } + } + + const int64_t received = mmsg[packet_num].msg_len; + total_bytes_read += received; + + // If gro was used, then the kernel will tell us the size of each part that was spliced together. + Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)", static_cast<long long>(received), static_cast<long long>(gso_size), + (gso_size > 0 ? "GRO" : "No GRO")); + + auto chain = buffer_chain[packet_num]; + IOBufferBlock *block; + int64_t remaining{received}; + while (remaining > 0) { + block = chain.get(); + int64_t this_packet_r = gso_size ? std::min(gso_size, received) : received; + while (block && this_packet_r > 0) { + if (this_packet_r > buffer_size) { + block->fill(buffer_size); + this_packet_r -= buffer_size; + block = block->next.get(); + remaining -= buffer_size; + } else { + block->fill(this_packet_r); + remaining -= this_packet_r; + this_packet_r = 0; + next_chain = block->next.get(); + block->next = nullptr; + } + } + Debug("udp-read", "Creating packet"); + // create packet + UDPPacket *p = + UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr[packet_num]), ats_ip_sa_cast(&toaddr[packet_num]), chain); + p->setConnection(uc); + // queue onto the UDPConnection + uc->inQueue.push(p); + + // reload the unused block + chain = next_chain; + next_chain = nullptr; + } + } + Debug("udp-read", "Total bytes read %ld from %d packets.", total_bytes_read, return_val); + + // if not already on to-be-called-back queue, then add it. + if (!uc->onCallbackQueue) { + ink_assert(uc->callback_link.next == nullptr); + ink_assert(uc->callback_link.prev == nullptr); + uc->AddRef(); + nh->udp_callbacks.enqueue(uc); + uc->onCallbackQueue = 1; + } +} +#endif + +void +UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc) +{ +#if HAVE_RECVMMSG + read_multiple_messages_from_net(nh, xuc); +#else + read_single_message_from_net(nh, xuc); +#endif +} + int UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, EThread *thread) { @@ -929,6 +1176,17 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int s } } +#ifdef SOL_UDP + if (G_udp_config.enable_gro) { + int gro = 1; + if (safe_setsockopt(fd, IPPROTO_UDP, UDP_GRO, (char *)&gro, sizeof(gro)) == 0) { + Debug("udpnet", "setsockopt UDP_GRO ok"); + } else { + Debug("udpnet", "setsockopt UDP_GRO. errno=%d", errno); + } + } +#endif + // If this is a class D address (i.e. multicast address), use REUSEADDR. if (ats_is_ip_multicast(addr)) { if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) { @@ -957,6 +1215,7 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int s goto Lerror; } + // check this for GRO if (recv_bufsize) { if (unlikely(SocketManager::set_rcvbuf_size(fd, recv_bufsize))) { Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize); @@ -1449,13 +1708,23 @@ net_signal_hook_callback(EThread *thread) #endif } -UDPNetHandler::UDPNetHandler(bool enable_gso) : udpOutQueue(enable_gso) +UDPNetHandler::UDPNetHandler(Cfg &&cfg) : udpOutQueue(cfg.enable_gso), _cfg{std::move(cfg)} { nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000); lastCheck = 0; SET_HANDLER(&UDPNetHandler::startNetEvent); } +bool +UDPNetHandler::is_gro_enabled() const +{ +#ifndef SOL_UDP + return false; +#else + return this->_cfg.enable_gro; +#endif +} + int UDPNetHandler::startNetEvent(int event, Event *e) { diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc index 65fd3e719c..c827cb4be1 100644 --- a/src/records/RecordsConfig.cc +++ b/src/records/RecordsConfig.cc @@ -260,7 +260,8 @@ static const RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.udp.enable_gso", RECD_INT, "1", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL} , - + {RECT_CONFIG, "proxy.config.udp.enable_gro", RECD_INT, "1", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL} + , //############################################################################## //# //# Alarm Configuration
