This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 68b6287e68 Add support for recvmmsg and UDP GRO (#9905)
68b6287e68 is described below
commit 68b6287e68f9322e78374f93876693691645ea99
Author: Damian Meden <[email protected]>
AuthorDate: Fri Jul 7 11:33:10 2023 +0100
Add support for recvmmsg and UDP GRO (#9905)
This PR adds support for UDP GRO when reading the socket using recvm(m)sg,
also includes support for recvmmsg when it is available by the OS.
A new record config is introduced for the GRO feature =>
proxy.config.udp.enable_gro
---
configure.ac | 2 +-
doc/admin-guide/files/records.yaml.en.rst | 5 +
iocore/eventsystem/I_SocketManager.h | 3 +
iocore/eventsystem/P_UnixSocketManager.h | 15 +
iocore/net/I_UDPPacket.h | 2 +-
iocore/net/P_UDPNet.h | 17 +-
iocore/net/QUICPacketHandler_quiche.cc | 2 +-
iocore/net/UnixUDPNet.cc | 441 ++++++++++++++++++++++++------
src/records/RecordsConfig.cc | 3 +-
9 files changed, 399 insertions(+), 91 deletions(-)
diff --git a/configure.ac b/configure.ac
index 555a8a7baa..725ad218d7 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1390,7 +1390,7 @@ AC_CHECK_FUNCS([clock_gettime kqueue epoll_ctl
posix_fadvise posix_madvise posix
AC_CHECK_FUNCS([port_create strlcpy strlcat sysconf sysctlbyname getpagesize])
AC_CHECK_FUNCS([getreuid getresuid getresgid setreuid setresuid getpeereid
getpeerucred])
AC_CHECK_FUNCS([strsignal psignal psiginfo accept4])
-AC_CHECK_FUNCS([sendmmsg])
+AC_CHECK_FUNCS([sendmmsg recvmmsg])
# Check for eventfd() and sys/eventfd.h (both must exist ...)
AC_CHECK_HEADERS([sys/eventfd.h], [
diff --git a/doc/admin-guide/files/records.yaml.en.rst
b/doc/admin-guide/files/records.yaml.en.rst
index ed5901c4d1..e71f9d0e7f 100644
--- a/doc/admin-guide/files/records.yaml.en.rst
+++ b/doc/admin-guide/files/records.yaml.en.rst
@@ -4877,6 +4877,11 @@ UDP Configuration
and disables it automatically if it causes send errors.
+.. ts:cv:: CONFIG proxy.config.udp.enable_gro INT 1
+
+ Enables (``1``) or disables (``0``) UDP GRO. When enabled, |TS| will try to
use it
+ when reading the UDP socket.
+
Plug-in Configuration
=====================
diff --git a/iocore/eventsystem/I_SocketManager.h
b/iocore/eventsystem/I_SocketManager.h
index 591bd2ac23..a4775b9866 100644
--- a/iocore/eventsystem/I_SocketManager.h
+++ b/iocore/eventsystem/I_SocketManager.h
@@ -82,6 +82,9 @@ int64_t pwrite(int fd, void *buf, int len, off_t offset, char
*tag = nullptr);
int send(int fd, void *buf, int len, int flags);
int sendto(int fd, void *buf, int len, int flags, struct sockaddr const *to,
int tolen);
int sendmsg(int fd, struct msghdr *m, int flags, void *pOLP = nullptr);
+#ifdef HAVE_RECVMMSG
+int recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags, struct
timespec *timeout, void *pOLP = nullptr);
+#endif
int64_t lseek(int fd, off_t offset, int whence);
int fsync(int fildes);
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
diff --git a/iocore/eventsystem/P_UnixSocketManager.h
b/iocore/eventsystem/P_UnixSocketManager.h
index 177a3704f8..bbc8e5c546 100644
--- a/iocore/eventsystem/P_UnixSocketManager.h
+++ b/iocore/eventsystem/P_UnixSocketManager.h
@@ -119,6 +119,21 @@ SocketManager::recvmsg(int fd, struct msghdr *m, int
flags, void * /* pOLP ATS_U
return r;
}
+#ifdef HAVE_RECVMMSG
+TS_INLINE int
+SocketManager::recvmmsg(int fd, struct mmsghdr *msgvec, int vlen, int flags,
struct timespec *timeout, void * /* pOLP ATS_UNUSED */)
+{
+ int r;
+ do {
+ if (unlikely((r = ::recvmmsg(fd, msgvec, vlen, flags, timeout)) < 0)) {
+ r = -errno;
+ // EINVAL can ocur if timeout is invalid.
+ }
+ } while (r == -EINTR);
+ return r;
+}
+#endif
+
TS_INLINE int64_t
SocketManager::write(int fd, void *buf, int size, void * /* pOLP ATS_UNUSED */)
{
diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h
index 4d88eb1242..3166238c66 100644
--- a/iocore/net/I_UDPPacket.h
+++ b/iocore/net/I_UDPPacket.h
@@ -110,7 +110,7 @@ public:
Create a new packet to be delivered to application.
Internal function only
*/
- static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct
sockaddr *to, Ptr<IOBufferBlock> &block);
+ static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct
sockaddr *to, Ptr<IOBufferBlock> block);
private:
SLINK(UDPPacket, alink); // atomic link
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index 796e07a642..001d307b83 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -54,6 +54,10 @@ public:
off_t pollCont_offset;
off_t udpNetHandler_offset;
+
+private:
+ void read_single_message_from_net(UDPNetHandler *nh, UDPConnection *uc);
+ void read_multiple_messages_from_net(UDPNetHandler *nh, UDPConnection *xuc);
};
extern UDPNetProcessorInternal udpNetInternal;
@@ -310,6 +314,11 @@ void initialize_thread_for_udp_net(EThread *thread);
class UDPNetHandler : public Continuation, public EThread::LoopTailHandler
{
public:
+ struct Cfg {
+ // Segmentation offload.
+ bool enable_gso{true};
+ bool enable_gro{true};
+ };
// engine for outgoing packets
UDPQueue udpOutQueue;
@@ -333,7 +342,13 @@ public:
int waitForActivity(ink_hrtime timeout) override;
void signalActivity() override;
- UDPNetHandler(bool enable_gso);
+ UDPNetHandler(Cfg &&cfg);
+
+ // GRO
+ bool is_gro_enabled() const;
+
+private:
+ Cfg _cfg; // Note: may not be the best place to put this, but for now is ok.
};
static inline PollCont *
diff --git a/iocore/net/QUICPacketHandler_quiche.cc
b/iocore/net/QUICPacketHandler_quiche.cc
index 10f6223d9e..b09b349464 100644
--- a/iocore/net/QUICPacketHandler_quiche.cc
+++ b/iocore/net/QUICPacketHandler_quiche.cc
@@ -207,7 +207,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket
*udp_packet)
int rc = quiche_header_info(buf, buf_len, QUICConnectionId::SCID_LEN,
&version, &type, scid, &scid_len, dcid, &dcid_len, token,
&token_len);
if (rc < 0) {
- QUICDebug("Ignore packet - failed to parse header");
+ QUICDebug("Ignore packet - failed to parse header: '%d'", rc);
udp_packet->free();
return;
}
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index 6ecfdd3ac2..97031ad81a 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -37,20 +37,32 @@
#include "P_DNSConnection.h"
#include "P_Net.h"
#include "P_UDPNet.h"
+#include "tscore/ink_inet.h"
#include "tscore/ink_sock.h"
+#include <netinet/udp.h>
-#include "netinet/udp.h"
#ifndef UDP_SEGMENT
// This is needed because old glibc may not have the constant even if Kernel
supports it.
#define UDP_SEGMENT 103
#endif
+#ifndef UDP_GRO
+#define UDP_GRO 104
+#endif
+
using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
ClassAllocator<UDPPacket> udpPacketAllocator("udpPacketAllocator");
EventType ET_UDP;
+namespace
+{
+#ifdef HAVE_RECVMMSG
+const uint32_t MAX_RECEIVE_MSG_PER_CALL{16}; //< VLEN parameter for the
recvmmsg call.
+#endif
+}; // namespace
+
UDPPacket *
UDPPacket::new_UDPPacket()
{
@@ -73,7 +85,7 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to,
ink_hrtime when, Ptr<IOBuffe
}
UDPPacket *
-UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to,
Ptr<IOBufferBlock> &block)
+UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to,
Ptr<IOBufferBlock> block)
{
UDPPacket *p = udpPacketAllocator.alloc();
@@ -150,8 +162,6 @@ UDPPacket::get_entire_chain_buffer(size_t *buf_len)
if (this->_payload == nullptr) {
IOBufferBlock *block = this->getIOBlockChain();
- // No need to allocate, we will use the first slice. With the current
slice size
- // 2048 more likely we will using this block most of the time.
if (block && block->next.get() == nullptr) {
*buf_len = this->getPktLength();
return reinterpret_cast<uint8_t *>(block->buf());
@@ -183,6 +193,12 @@ int32_t g_udp_periodicCleanupSlots;
int32_t g_udp_periodicFreeCancelledPkts;
int32_t g_udp_numSendRetries;
+namespace
+{
+// We'd need to set some flags in case some of the config is enabled. So we
have
+// this object as global.
+UDPNetHandler::Cfg G_udp_config;
+} // namespace
//
// Public functions
// See header for documentation
@@ -193,12 +209,23 @@ sockaddr_in6 G_bwGrapherLoc;
void
initialize_thread_for_udp_net(EThread *thread)
{
- int enable_gso;
+ int enable_gso, enable_gro;
REC_ReadConfigInteger(enable_gso, "proxy.config.udp.enable_gso");
+ REC_ReadConfigInteger(enable_gro, "proxy.config.udp.enable_gro");
UDPNetHandler *nh = get_UDPNetHandler(thread);
- new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(enable_gso);
+ UDPNetHandler::Cfg cfg{static_cast<bool>(enable_gso),
static_cast<bool>(enable_gro)};
+
+ G_udp_config = cfg; // keep a global copy.
+ new (reinterpret_cast<ink_dummy_for_new *>(nh))
UDPNetHandler(std::move(cfg));
+
+#ifndef SOL_UDP
+ if (G_udp_config.enable_gro) {
+ Warning("Attempted to use UDP GRO per configuration, but it is
unavailable");
+ }
+#endif
+
new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread)))
PollCont(thread->mutex);
// The UDPNetHandler cannot be accessed across EThreads.
// Because the UDPNetHandler should be called back immediately after
UDPPollCont.
@@ -259,8 +286,74 @@ UDPNetProcessorInternal::start(int n_upd_threads, size_t
stacksize)
return 0;
}
+namespace
+{
+bool
+get_ip_address_from_cmsg(struct cmsghdr *cmsg, sockaddr_in6 *toaddr)
+{
+#ifdef IP_PKTINFO
+ if (IP_PKTINFO == cmsg->cmsg_type) {
+ if (cmsg->cmsg_level == IPPROTO_IP) {
+ struct in_pktinfo *pktinfo =
reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
+ reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr =
pktinfo->ipi_addr.s_addr;
+ }
+ return true;
+ }
+#endif
+#ifdef IP_RECVDSTADDR
+ if (IP_RECVDSTADDR == cmsg->cmsg_type) {
+ if (cmsg->cmsg_level == IPPROTO_IP) {
+ struct in_addr *addr =
reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
+ reinterpret_cast<sockaddr_in *>(toaddr)->sin_addr.s_addr = addr->s_addr;
+ }
+ return true;
+ }
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+ if (IPV6_PKTINFO == cmsg->cmsg_type) { // IPV6_RECVPKTINFO uses IPV6_PKTINFO
too
+ if (cmsg->cmsg_level == IPPROTO_IPV6) {
+ struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo
*>(CMSG_DATA(cmsg));
+ memcpy(toaddr->sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
+ }
+ return true;
+ }
+#endif
+ return false;
+}
+
+unsigned int
+build_iovec_block_chain(unsigned max_niov, int64_t size_index,
Ptr<IOBufferBlock> &chain, struct iovec *out_tiovec)
+{
+ unsigned int niov;
+ IOBufferBlock *b, *last;
+
+ // build struct iov
+ // reuse the block in chain if available
+ b = chain.get();
+ last = nullptr;
+ for (niov = 0; niov < max_niov; niov++) {
+ if (b == nullptr) {
+ b = new_IOBufferBlock();
+ b->alloc(size_index);
+ if (last == nullptr) {
+ chain = b;
+ } else {
+ last->next = b;
+ }
+ }
+
+ out_tiovec[niov].iov_base = b->buf();
+ out_tiovec[niov].iov_len = b->block_size();
+
+ last = b;
+ b = b->next.get();
+ }
+ return niov;
+}
+} // namespace
+
void
-UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection
*xuc)
+UDPNetProcessorInternal::read_single_message_from_net(UDPNetHandler *nh,
UDPConnection *xuc)
{
UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
@@ -269,7 +362,7 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler
*nh, UDPConnection *xuc
int64_t r;
int iters = 0;
unsigned max_niov = 32;
-
+ int64_t gso_size{0}; // in case is available.
struct msghdr msg;
Ptr<IOBufferBlock> chain, next_chain;
struct iovec tiovec[max_niov];
@@ -280,42 +373,37 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler
*nh, UDPConnection *xuc
// And there is 8 octets in 'User Datagram Header' which means the max
length of payload is no more than 65527 bytes.
do {
// create IOBufferBlock chain to receive data
- unsigned int niov;
- IOBufferBlock *b, *last;
-
- // build struct iov
- // reuse the block in chain if available
- b = chain.get();
- last = nullptr;
- for (niov = 0; niov < max_niov; niov++) {
- if (b == nullptr) {
- b = new_IOBufferBlock();
- b->alloc(size_index);
- if (last == nullptr) {
- chain = b;
- } else {
- last->next = b;
- }
- }
-
- tiovec[niov].iov_base = b->buf();
- tiovec[niov].iov_len = b->block_size();
-
- last = b;
- b = b->next.get();
- }
+ unsigned int niov = build_iovec_block_chain(max_niov, size_index, chain,
tiovec);
// build struct msghdr
sockaddr_in6 fromaddr;
sockaddr_in6 toaddr;
- int toaddr_len = sizeof(toaddr);
- char *cbuf[1024];
- msg.msg_name = &fromaddr;
- msg.msg_namelen = sizeof(fromaddr);
- msg.msg_iov = tiovec;
- msg.msg_iovlen = niov;
- msg.msg_control = cbuf;
- msg.msg_controllen = sizeof(cbuf);
+ int toaddr_len = sizeof(toaddr);
+ msg.msg_name = &fromaddr;
+ msg.msg_namelen = sizeof(fromaddr);
+ msg.msg_iov = tiovec;
+ msg.msg_iovlen = niov;
+
+ static const size_t cmsg_size
+ {
+ CMSG_SPACE(sizeof(int))
+#ifdef IP_PKTINFO
+ +CMSG_SPACE(sizeof(struct in_pktinfo))
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+ + CMSG_SPACE(sizeof(struct in6_pktinfo))
+#endif
+#ifdef IP_RECVDSTADDR
+ + CMSG_SPACE(sizeof(struct in_addr))
+#endif
+#ifdef UDP_GRO
+ + CMSG_SPACE(sizeof(uint16_t))
+#endif
+ };
+
+ char control[cmsg_size] = {0};
+ msg.msg_control = control;
+ msg.msg_controllen = cmsg_size;
// receive data by recvmsg
r = SocketManager::recvmsg(uc->getFd(), &msg, 0);
@@ -329,61 +417,56 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler
*nh, UDPConnection *xuc
Debug("udp-read", "The UDP packet is truncated");
}
- // fill the IOBufferBlock chain
- int64_t saved = r;
- b = chain.get();
- while (b && saved > 0) {
- if (saved > buffer_size) {
- b->fill(buffer_size);
- saved -= buffer_size;
- b = b->next.get();
- } else {
- b->fill(saved);
- saved = 0;
- next_chain = b->next.get();
- b->next = nullptr;
- }
- }
-
safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr
*>(&toaddr), &toaddr_len);
for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg =
CMSG_NXTHDR(&msg, cmsg)) {
- switch (cmsg->cmsg_type) {
-#ifdef IP_PKTINFO
- case IP_PKTINFO:
- if (cmsg->cmsg_level == IPPROTO_IP) {
- struct in_pktinfo *pktinfo =
reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg));
- reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr =
pktinfo->ipi_addr.s_addr;
- }
+ if (get_ip_address_from_cmsg(cmsg, &toaddr)) {
break;
-#endif
-#ifdef IP_RECVDSTADDR
- case IP_RECVDSTADDR:
- if (cmsg->cmsg_level == IPPROTO_IP) {
- struct in_addr *addr =
reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg));
- reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr =
addr->s_addr;
+ }
+#ifdef SOL_UDP
+ if (UDP_GRO == cmsg->cmsg_type) {
+ if (nh->is_gro_enabled()) {
+ gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg));
}
break;
+ }
#endif
-#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
- case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too
- if (cmsg->cmsg_level == IPPROTO_IPV6) {
- struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo
*>(CMSG_DATA(cmsg));
- memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16);
+ }
+
+ // If gro was used, then the kernel will tell us the size of each part
that was spliced together.
+ Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)",
static_cast<long long>(r), static_cast<long long>(gso_size),
+ (gso_size > 0 ? "GRO" : "No GRO"));
+
+ IOBufferBlock *block;
+ int64_t remaining{r};
+ while (remaining > 0) {
+ block = chain.get();
+ int64_t this_packet_r = gso_size ? std::min(gso_size, r) : r;
+ while (block && this_packet_r > 0) {
+ if (this_packet_r > buffer_size) {
+ block->fill(buffer_size);
+ this_packet_r -= buffer_size;
+ block = block->next.get();
+ remaining -= buffer_size;
+ } else {
+ block->fill(this_packet_r);
+ remaining -= this_packet_r;
+ this_packet_r = 0;
+ next_chain = block->next.get();
+ block->next = nullptr;
}
- break;
-#endif
}
+ Debug("udp-read", "Creating packet");
+ // create packet
+ UDPPacket *p =
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr),
ats_ip_sa_cast(&toaddr), chain);
+ p->setConnection(uc);
+ // queue onto the UDPConnection
+ uc->inQueue.push(p);
+
+ // reload the unused block
+ chain = next_chain;
+ next_chain = nullptr;
}
- // create packet
- UDPPacket *p =
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr),
ats_ip_sa_cast(&toaddr), chain);
- p->setConnection(uc);
- // queue onto the UDPConnection
- uc->inQueue.push(p);
-
- // reload the unused block
- chain = next_chain;
- next_chain = nullptr;
iters++;
} while (r > 0);
if (iters >= 1) {
@@ -399,6 +482,170 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler
*nh, UDPConnection *xuc
}
}
+#ifdef HAVE_RECVMMSG
+void
+UDPNetProcessorInternal::read_multiple_messages_from_net(UDPNetHandler *nh,
UDPConnection *xuc)
+{
+ UnixUDPConnection *uc = (UnixUDPConnection *)xuc;
+
+ std::array<Ptr<IOBufferBlock>, MAX_RECEIVE_MSG_PER_CALL> buffer_chain;
+ unsigned max_niov = 32;
+ int64_t gso_size{0}; // In case is available
+
+ struct mmsghdr mmsg[MAX_RECEIVE_MSG_PER_CALL];
+ struct iovec tiovec[MAX_RECEIVE_MSG_PER_CALL][max_niov];
+
+ // Addresses
+ sockaddr_in6 fromaddr[MAX_RECEIVE_MSG_PER_CALL];
+ sockaddr_in6 toaddr[MAX_RECEIVE_MSG_PER_CALL];
+ int toaddr_len = sizeof(toaddr);
+
+ size_t total_bytes_read{0};
+
+ static const size_t cmsg_size
+ {
+ CMSG_SPACE(sizeof(int))
+#ifdef IP_PKTINFO
+ +CMSG_SPACE(sizeof(struct in_pktinfo))
+#endif
+#if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO)
+ + CMSG_SPACE(sizeof(struct in6_pktinfo))
+#endif
+#ifdef IP_RECVDSTADDR
+ + CMSG_SPACE(sizeof(struct in_addr))
+#endif
+#ifdef UDP_GRO
+ + CMSG_SPACE(sizeof(uint16_t))
+#endif
+ };
+
+ // Ancillary data.
+ char control[MAX_RECEIVE_MSG_PER_CALL][cmsg_size];
+
+ int64_t size_index = BUFFER_SIZE_INDEX_2K;
+ int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index);
+ // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes.
+ // Because the 'UDP Length' is type of uint16_t defined in RFC 768.
+ // And there is 8 octets in 'User Datagram Header' which means the max
length of payload is no more than 65527 bytes.
+
+ for (uint32_t msg_num = 0; msg_num < MAX_RECEIVE_MSG_PER_CALL; msg_num++) {
+ // build each block chain.
+ unsigned int niov = build_iovec_block_chain(max_niov, size_index,
buffer_chain[msg_num], tiovec[msg_num]);
+
+ mmsg[msg_num].msg_hdr.msg_iov = tiovec[msg_num];
+ mmsg[msg_num].msg_hdr.msg_iovlen = niov;
+ mmsg[msg_num].msg_hdr.msg_name = &fromaddr[msg_num];
+ mmsg[msg_num].msg_hdr.msg_namelen = sizeof(fromaddr[msg_num]);
+ memset(control[msg_num], 0, cmsg_size);
+ mmsg[msg_num].msg_hdr.msg_control = control[msg_num];
+ mmsg[msg_num].msg_hdr.msg_controllen = cmsg_size;
+ }
+
+ const int return_val = SocketManager::recvmmsg(uc->getFd(), mmsg,
MAX_RECEIVE_MSG_PER_CALL, MSG_WAITFORONE, nullptr);
+
+ if (return_val <= 0) {
+ Debug("udp-read", "Done. recvmmsg() ret is %d, errno %s", return_val,
strerror(errno));
+ return;
+ }
+ Debug("udp-read", "recvmmsg() read %d packets", return_val);
+
+ Ptr<IOBufferBlock> chain, next_chain;
+ for (auto packet_num = 0; packet_num < return_val; packet_num++) {
+ gso_size = 0;
+
+ Debug("udp-read", "Processing message %d from a total of %d", packet_num,
return_val);
+ struct msghdr &mhdr = mmsg[packet_num].msg_hdr;
+
+ if (mhdr.msg_flags & MSG_TRUNC) {
+ Debug("udp-read", "The UDP packet is truncated");
+ break;
+ }
+
+ if (mhdr.msg_namelen <= 0) {
+ Debug("udp-read", "Unable to get remote address from recvmmsg() for fd:
%d", uc->getFd());
+ return;
+ }
+
+ safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr
*>(&toaddr[packet_num]), &toaddr_len);
+ if (mhdr.msg_controllen > 0) {
+ for (auto cmsg = CMSG_FIRSTHDR(&mhdr); cmsg != nullptr; cmsg =
CMSG_NXTHDR(&mhdr, cmsg)) {
+ if (get_ip_address_from_cmsg(cmsg, &toaddr[packet_num])) {
+ break;
+ }
+#ifdef SOL_UDP
+ if (UDP_GRO == cmsg->cmsg_type) {
+ if (nh->is_gro_enabled()) {
+ gso_size = *reinterpret_cast<uint16_t *>(CMSG_DATA(cmsg));
+ }
+ break;
+ }
+#endif
+ }
+ }
+
+ const int64_t received = mmsg[packet_num].msg_len;
+ total_bytes_read += received;
+
+ // If gro was used, then the kernel will tell us the size of each part
that was spliced together.
+ Debug("udp-read", "Received %lld bytes. gso_size %lld (%s)",
static_cast<long long>(received), static_cast<long long>(gso_size),
+ (gso_size > 0 ? "GRO" : "No GRO"));
+
+ auto chain = buffer_chain[packet_num];
+ IOBufferBlock *block;
+ int64_t remaining{received};
+ while (remaining > 0) {
+ block = chain.get();
+ int64_t this_packet_r = gso_size ? std::min(gso_size, received) :
received;
+ while (block && this_packet_r > 0) {
+ if (this_packet_r > buffer_size) {
+ block->fill(buffer_size);
+ this_packet_r -= buffer_size;
+ block = block->next.get();
+ remaining -= buffer_size;
+ } else {
+ block->fill(this_packet_r);
+ remaining -= this_packet_r;
+ this_packet_r = 0;
+ next_chain = block->next.get();
+ block->next = nullptr;
+ }
+ }
+ Debug("udp-read", "Creating packet");
+ // create packet
+ UDPPacket *p =
+
UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr[packet_num]),
ats_ip_sa_cast(&toaddr[packet_num]), chain);
+ p->setConnection(uc);
+ // queue onto the UDPConnection
+ uc->inQueue.push(p);
+
+ // reload the unused block
+ chain = next_chain;
+ next_chain = nullptr;
+ }
+ }
+ Debug("udp-read", "Total bytes read %ld from %d packets.", total_bytes_read,
return_val);
+
+ // if not already on to-be-called-back queue, then add it.
+ if (!uc->onCallbackQueue) {
+ ink_assert(uc->callback_link.next == nullptr);
+ ink_assert(uc->callback_link.prev == nullptr);
+ uc->AddRef();
+ nh->udp_callbacks.enqueue(uc);
+ uc->onCallbackQueue = 1;
+ }
+}
+#endif
+
+void
+UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection
*xuc)
+{
+#if HAVE_RECVMMSG
+ read_multiple_messages_from_net(nh, xuc);
+#else
+ read_single_message_from_net(nh, xuc);
+#endif
+}
+
int
UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc,
EThread *thread)
{
@@ -929,6 +1176,17 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr
const *addr, int fd, int s
}
}
+#ifdef SOL_UDP
+ if (G_udp_config.enable_gro) {
+ int gro = 1;
+ if (safe_setsockopt(fd, IPPROTO_UDP, UDP_GRO, (char *)&gro, sizeof(gro))
== 0) {
+ Debug("udpnet", "setsockopt UDP_GRO ok");
+ } else {
+ Debug("udpnet", "setsockopt UDP_GRO. errno=%d", errno);
+ }
+ }
+#endif
+
// If this is a class D address (i.e. multicast address), use REUSEADDR.
if (ats_is_ip_multicast(addr)) {
if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) {
@@ -957,6 +1215,7 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr
const *addr, int fd, int s
goto Lerror;
}
+ // check this for GRO
if (recv_bufsize) {
if (unlikely(SocketManager::set_rcvbuf_size(fd, recv_bufsize))) {
Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
@@ -1449,13 +1708,23 @@ net_signal_hook_callback(EThread *thread)
#endif
}
-UDPNetHandler::UDPNetHandler(bool enable_gso) : udpOutQueue(enable_gso)
+UDPNetHandler::UDPNetHandler(Cfg &&cfg) : udpOutQueue(cfg.enable_gso),
_cfg{std::move(cfg)}
{
nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
lastCheck = 0;
SET_HANDLER(&UDPNetHandler::startNetEvent);
}
+bool
+UDPNetHandler::is_gro_enabled() const
+{
+#ifndef SOL_UDP
+ return false;
+#else
+ return this->_cfg.enable_gro;
+#endif
+}
+
int
UDPNetHandler::startNetEvent(int event, Event *e)
{
diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc
index 65fd3e719c..c827cb4be1 100644
--- a/src/records/RecordsConfig.cc
+++ b/src/records/RecordsConfig.cc
@@ -260,7 +260,8 @@ static const RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.udp.enable_gso", RECD_INT, "1", RECU_NULL,
RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
-
+ {RECT_CONFIG, "proxy.config.udp.enable_gro", RECD_INT, "1", RECU_NULL,
RR_NULL, RECC_NULL, nullptr, RECA_NULL}
+ ,
//##############################################################################
//#
//# Alarm Configuration