This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 9777b5a88c UDP-Net: Add support for udp pacing if available. (#11330)
9777b5a88c is described below
commit 9777b5a88c32d54f324c21d3b6ece59a280e226f
Author: Damian Meden <[email protected]>
AuthorDate: Thu May 9 22:38:55 2024 +0200
UDP-Net: Add support for udp pacing if available. (#11330)
This change uses SO_TXTIME to set a hint to schedule transmission
of specific packets at specific times, we use the hint given by
the quiche library when quiche_conn_send is invoked.
---
CMakeLists.txt | 9 +++
include/iocore/net/UDPPacket.h | 6 +-
include/tscore/ink_config.h.cmake.in | 1 +
src/iocore/net/P_QUICPacketHandler.h | 3 +-
src/iocore/net/QUICNetVConnection.cc | 10 ++-
src/iocore/net/QUICPacketHandler.cc | 5 +-
src/iocore/net/UnixUDPNet.cc | 123 ++++++++++++++++++++++++++++++-----
7 files changed, 137 insertions(+), 20 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e127bab5c2..2eefcebcdb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -407,6 +407,15 @@ if(TS_USE_HWLOC)
)
endif()
+check_symbol_exists(SO_TXTIME "sys/socket.h" SO_TXTIME_FOUND)
+set(CMAKE_EXTRA_INCLUDE_FILES "linux/net_tstamp.h")
+check_type_size("struct sock_txtime" STRUCT_SOCK_TXTIME_FOUND)
+unset(CMAKE_EXTRA_INCLUDE_FILES)
+
+if(SO_TXTIME_FOUND AND STRUCT_SOCK_TXTIME_FOUND)
+ set(HAVE_SO_TXTIME TRUE)
+endif()
+
option(USE_IOURING "Use experimental io_uring (linux only)" 0)
if(HAVE_IOURING AND USE_IOURING)
message(STATUS "Using io_uring")
diff --git a/include/iocore/net/UDPPacket.h b/include/iocore/net/UDPPacket.h
index b3a9dac1a2..349e3162e2 100644
--- a/include/iocore/net/UDPPacket.h
+++ b/include/iocore/net/UDPPacket.h
@@ -40,6 +40,9 @@ struct UDPPacketInternal {
int reqGenerationNum = 0;
ink_hrtime delivery_time = 0; // when to deliver packet
+#ifdef HAVE_SO_TXTIME
+ struct timespec send_at;
+#endif
Ptr<IOBufferBlock> chain;
Continuation *cont = nullptr; // callback on error
@@ -102,7 +105,8 @@ public:
@param buf IOBufferBlock chain of data to use
@param segment_size Segment size
*/
- static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when,
Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0);
+ static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when,
Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0,
+ struct timespec *send_at_hint = nullptr);
/**
Create a new packet to be delivered to application.
diff --git a/include/tscore/ink_config.h.cmake.in
b/include/tscore/ink_config.h.cmake.in
index 5e33ca19de..3f9d848632 100644
--- a/include/tscore/ink_config.h.cmake.in
+++ b/include/tscore/ink_config.h.cmake.in
@@ -69,6 +69,7 @@
#cmakedefine01 HAVE_STRSIGNAL
#cmakedefine01 HAVE_SYSINFO
#cmakedefine01 HAVE_PRCTL
+#cmakedefine HAVE_SO_TXTIME 1
#cmakedefine01 HAVE_HWLOC_OBJ_PU
diff --git a/src/iocore/net/P_QUICPacketHandler.h
b/src/iocore/net/P_QUICPacketHandler.h
index a825db65e4..9a34866e79 100644
--- a/src/iocore/net/P_QUICPacketHandler.h
+++ b/src/iocore/net/P_QUICPacketHandler.h
@@ -40,7 +40,8 @@ public:
QUICPacketHandler();
virtual ~QUICPacketHandler();
- void send_packet(UDPConnection *udp_con, IpEndpoint &addr,
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0);
+ void send_packet(UDPConnection *udp_con, IpEndpoint &addr,
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0,
+ struct timespec *send_at_hint = nullptr);
void close_connection(QUICNetVConnection *conn);
protected:
diff --git a/src/iocore/net/QUICNetVConnection.cc
b/src/iocore/net/QUICNetVConnection.cc
index 203eec6dda..a52b9320a7 100644
--- a/src/iocore/net/QUICNetVConnection.cc
+++ b/src/iocore/net/QUICNetVConnection.cc
@@ -670,6 +670,7 @@ QUICNetVConnection::_handle_write_ready()
Ptr<IOBufferBlock> udp_payload;
quiche_send_info send_info;
+ struct timespec send_at_hint;
ssize_t res;
ssize_t written = 0;
@@ -683,6 +684,13 @@ QUICNetVConnection::_handle_write_ready()
while (written + max_udp_payload_size <= quantum) {
res = quiche_conn_send(this->_quiche_con, reinterpret_cast<uint8_t
*>(udp_payload->end()) + written, max_udp_payload_size,
&send_info);
+
+#ifdef HAVE_SO_TXTIME
+ if (written == 0) {
+ memcpy(&send_at_hint, &send_info.at, sizeof(struct timespec));
+ }
+#endif
+
if (res > 0) {
written += res;
}
@@ -696,7 +704,7 @@ QUICNetVConnection::_handle_write_ready()
if (static_cast<size_t>(written) > max_udp_payload_size) {
segment_size = max_udp_payload_size;
}
- this->_packet_handler->send_packet(this->_udp_con, this->con.addr,
udp_payload, segment_size);
+ this->_packet_handler->send_packet(this->_udp_con, this->con.addr,
udp_payload, segment_size, &send_at_hint);
net_activity(this, this_ethread());
}
}
diff --git a/src/iocore/net/QUICPacketHandler.cc
b/src/iocore/net/QUICPacketHandler.cc
index 8576553148..d336fcd57e 100644
--- a/src/iocore/net/QUICPacketHandler.cc
+++ b/src/iocore/net/QUICPacketHandler.cc
@@ -80,9 +80,10 @@ QUICPacketHandler::close_connection(QUICNetVConnection *conn)
}
void
-QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr,
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size)
+QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr,
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size,
+ struct timespec *send_at_hint)
{
- UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload,
segment_size);
+ UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload,
segment_size, send_at_hint);
if (dbg_ctl_v.on()) {
ip_port_text_buffer ipb;
diff --git a/src/iocore/net/UnixUDPNet.cc b/src/iocore/net/UnixUDPNet.cc
index 1d926ee7bf..92f44fd0e6 100644
--- a/src/iocore/net/UnixUDPNet.cc
+++ b/src/iocore/net/UnixUDPNet.cc
@@ -45,6 +45,9 @@
#include "tscore/ink_sock.h"
#include <netinet/udp.h>
#include "P_UnixNet.h"
+#ifdef HAVE_SO_TXTIME
+#include <linux/net_tstamp.h>
+#endif
#ifndef UDP_SEGMENT
// This is needed because old glibc may not have the constant even if Kernel
supports it.
@@ -80,7 +83,8 @@ UDPPacket::new_UDPPacket()
}
UDPPacket *
-UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when,
Ptr<IOBufferBlock> &buf, uint16_t segment_size)
+UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when,
Ptr<IOBufferBlock> &buf, uint16_t segment_size,
+ struct timespec *send_at_hint)
{
UDPPacket *p = udpPacketAllocator.alloc();
@@ -91,6 +95,11 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to,
ink_hrtime when, Ptr<IOBuffe
ats_ip_copy(&p->to, to);
p->p.chain = buf;
p->p.segment_size = segment_size;
+#ifdef HAVE_SO_TXTIME
+ if (send_at_hint) {
+ memcpy(&p->p.send_at, &send_at_hint, sizeof(struct timespec));
+ }
+#endif
return p;
}
@@ -1208,6 +1217,15 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr
const *addr, int fd, int s
}
#endif
+#ifdef HAVE_SO_TXTIME
+ struct sock_txtime sk_txtime;
+
+ sk_txtime.clockid = CLOCK_MONOTONIC;
+ sk_txtime.flags = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_TXTIME, &sk_txtime, sizeof(sk_txtime)) ==
-1) {
+ Dbg(dbg_ctl_udpnet, "Failed to setsockopt SO_TXTIME. errno=%d", errno);
+ }
+#endif
// If this is a class D address (i.e. multicast address), use REUSEADDR.
if (ats_is_ip_multicast(addr)) {
if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) {
@@ -1423,6 +1441,34 @@ UDPQueue::SendUDPPacket(UDPPacket *p)
msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa);
msg.msg_namelen = ats_ip_size(p->to);
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables
compiler warning.
+ struct cmsghdr *cm = nullptr;
+ uint8_t msg_ctrl[0
+#ifdef SOL_UDP
+ + CMSG_SPACE(sizeof(uint16_t))
+#endif
+#ifdef HAVE_SO_TXTIME
+ + CMSG_SPACE(sizeof(uint64_t))
+#endif
+ ];
+ memset(msg_ctrl, 0, sizeof(msg_ctrl));
+#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+
+#ifdef HAVE_SO_TXTIME
+ if (p->p.send_at.tv_sec > 0) {
+ msg.msg_control = msg_ctrl;
+ msg.msg_controllen = CMSG_SPACE(sizeof(uint64_t));
+ cm = CMSG_FIRSTHDR(&msg);
+
+ cm->cmsg_level = SOL_SOCKET;
+ cm->cmsg_type = SCM_TXTIME;
+ cm->cmsg_len = CMSG_LEN(sizeof(uint64_t));
+
+ // Convert struct timespec to nanoseconds.
+ *((uint64_t *)CMSG_DATA(cm)) = p->p.send_at.tv_sec * (1000ULL * 1000 *
1000) + p->p.send_at.tv_nsec;
+ }
+#endif
+
if (p->p.segment_size > 0) {
ink_assert(p->p.chain->next == nullptr);
msg.msg_iov = iov;
@@ -1436,10 +1482,16 @@ UDPQueue::SendUDPPacket(UDPPacket *p)
char buf[CMSG_SPACE(sizeof(uint16_t))];
struct cmsghdr align;
} u;
- msg.msg_control = u.buf;
- msg.msg_controllen = sizeof(u.buf);
- struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
+ if (cm == nullptr) {
+ msg.msg_control = msg_ctrl;
+ msg.msg_controllen = sizeof(u.buf);
+ cm = CMSG_FIRSTHDR(&msg);
+
+ } else {
+ msg.msg_controllen += sizeof(u.buf);
+ cm = CMSG_NXTHDR(&msg, cm);
+ }
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
@@ -1564,9 +1616,23 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t
n)
} else {
msgvec_size = sizeof(struct mmsghdr) * n * 64;
}
+
#else
msgvec_size = sizeof(struct mmsghdr) * n * 64;
#endif
+
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables
compiler warning.
+ uint8_t msg_ctrl[0
+#ifdef SOL_UDP
+ + CMSG_SPACE(sizeof(uint16_t))
+#endif
+#ifdef HAVE_SO_TXTIME
+ + CMSG_SPACE(sizeof(uint64_t))
+#endif
+ ];
+ memset(msg_ctrl, 0, sizeof(msg_ctrl));
+#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+
// The sizeof(struct msghdr) is 56 bytes or so. It can be too big to stack
(alloca).
IOBufferBlock *tmp = new_IOBufferBlock();
tmp->alloc(iobuffer_size_to_index(msgvec_size, BUFFER_SIZE_INDEX_1M));
@@ -1592,6 +1658,26 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t
n)
packet = p[i];
packet->p.conn->lastSentPktStartTime = packet->p.delivery_time;
ink_assert(packet->p.conn->getFd() == fd);
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+ struct cmsghdr *cm = nullptr;
+#endif
+#ifdef HAVE_SO_TXTIME
+ if (packet->p.send_at.tv_sec > 0) { // if set?
+ msg = &msgvec[vlen].msg_hdr;
+ msg->msg_controllen = CMSG_SPACE(sizeof(uint64_t));
+ msg->msg_control = msg_ctrl;
+ cm = CMSG_FIRSTHDR(msg);
+
+ cm->cmsg_level = SOL_SOCKET;
+ cm->cmsg_type = SCM_TXTIME;
+ cm->cmsg_len = CMSG_LEN(sizeof(uint64_t));
+
+ // Convert struct timespec to nanoseconds.
+ *((uint64_t *)CMSG_DATA(cm)) = packet->p.send_at.tv_sec * (1000ULL *
1000 * 1000) + packet->p.send_at.tv_nsec;
+ ;
+ }
+#endif
+
if (packet->p.segment_size > 0) {
// Presumes one big super buffer is given
ink_assert(packet->p.chain->next == nullptr);
@@ -1602,17 +1688,24 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p,
uint16_t n)
msg->msg_namelen = ats_ip_size(packet->to);
union udp_segment_hdr *u;
- u = static_cast<union udp_segment_hdr
*>(alloca(sizeof(union udp_segment_hdr)));
- msg->msg_control = u->buf;
- msg->msg_controllen = sizeof(u->buf);
- iov = &iovec[iovec_used++];
- iov_len = 1;
- iov->iov_base = packet->p.chain.get()->start();
- iov->iov_len = packet->p.chain.get()->size();
- msg->msg_iov = iov;
- msg->msg_iovlen = iov_len;
-
- struct cmsghdr *cm = CMSG_FIRSTHDR(msg);
+ u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union
udp_segment_hdr)));
+
+ iov = &iovec[iovec_used++];
+ iov_len = 1;
+ iov->iov_base = packet->p.chain.get()->start();
+ iov->iov_len = packet->p.chain.get()->size();
+ msg->msg_iov = iov;
+ msg->msg_iovlen = iov_len;
+
+ if (cm == nullptr) {
+ msg->msg_control = u->buf;
+ msg->msg_controllen = sizeof(u->buf);
+ cm = CMSG_FIRSTHDR(msg);
+ } else {
+ msg->msg_controllen += sizeof(u->buf);
+ cm = CMSG_NXTHDR(msg, cm);
+ }
+
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));