This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch 10.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit d8d999ff211435587d033f8faa89078cd9b0b4f1 Author: Damian Meden <[email protected]> AuthorDate: Thu May 9 22:38:55 2024 +0200 UDP-Net: Add support for udp pacing if available. (#11330) This change uses SO_TXTIME to set a hint to schedule transmission of specific packets at specific times, we use the hint given by the quiche library when quiche_conn_send is invoked. (cherry picked from commit 9777b5a88c32d54f324c21d3b6ece59a280e226f) --- CMakeLists.txt | 9 +++ include/iocore/net/UDPPacket.h | 6 +- include/tscore/ink_config.h.cmake.in | 1 + src/iocore/net/P_QUICPacketHandler.h | 3 +- src/iocore/net/QUICNetVConnection.cc | 10 ++- src/iocore/net/QUICPacketHandler.cc | 5 +- src/iocore/net/UnixUDPNet.cc | 123 ++++++++++++++++++++++++++++++----- 7 files changed, 137 insertions(+), 20 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c2f3fe871..4e060e6c79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -401,6 +401,15 @@ if(TS_USE_HWLOC) ) endif() +check_symbol_exists(SO_TXTIME "sys/socket.h" SO_TXTIME_FOUND) +set(CMAKE_EXTRA_INCLUDE_FILES "linux/net_tstamp.h") +check_type_size("struct sock_txtime" STRUCT_SOCK_TXTIME_FOUND) +unset(CMAKE_EXTRA_INCLUDE_FILES) + +if(SO_TXTIME_FOUND AND STRUCT_SOCK_TXTIME_FOUND) + set(HAVE_SO_TXTIME TRUE) +endif() + option(USE_IOURING "Use experimental io_uring (linux only)" 0) if(HAVE_IOURING AND USE_IOURING) message(STATUS "Using io_uring") diff --git a/include/iocore/net/UDPPacket.h b/include/iocore/net/UDPPacket.h index b3a9dac1a2..349e3162e2 100644 --- a/include/iocore/net/UDPPacket.h +++ b/include/iocore/net/UDPPacket.h @@ -40,6 +40,9 @@ struct UDPPacketInternal { int reqGenerationNum = 0; ink_hrtime delivery_time = 0; // when to deliver packet +#ifdef HAVE_SO_TXTIME + struct timespec send_at; +#endif Ptr<IOBufferBlock> chain; Continuation *cont = nullptr; // callback on error @@ -102,7 +105,8 @@ public: @param buf IOBufferBlock chain of data to use @param segment_size Segment size */ - static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0); + static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0, + struct timespec *send_at_hint = nullptr); /** Create a new packet to be delivered to application. diff --git a/include/tscore/ink_config.h.cmake.in b/include/tscore/ink_config.h.cmake.in index 5e33ca19de..3f9d848632 100644 --- a/include/tscore/ink_config.h.cmake.in +++ b/include/tscore/ink_config.h.cmake.in @@ -69,6 +69,7 @@ #cmakedefine01 HAVE_STRSIGNAL #cmakedefine01 HAVE_SYSINFO #cmakedefine01 HAVE_PRCTL +#cmakedefine HAVE_SO_TXTIME 1 #cmakedefine01 HAVE_HWLOC_OBJ_PU diff --git a/src/iocore/net/P_QUICPacketHandler.h b/src/iocore/net/P_QUICPacketHandler.h index a825db65e4..9a34866e79 100644 --- a/src/iocore/net/P_QUICPacketHandler.h +++ b/src/iocore/net/P_QUICPacketHandler.h @@ -40,7 +40,8 @@ public: QUICPacketHandler(); virtual ~QUICPacketHandler(); - void send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0); + void send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0, + struct timespec *send_at_hint = nullptr); void close_connection(QUICNetVConnection *conn); protected: diff --git a/src/iocore/net/QUICNetVConnection.cc b/src/iocore/net/QUICNetVConnection.cc index 670557cb89..c406799a68 100644 --- a/src/iocore/net/QUICNetVConnection.cc +++ b/src/iocore/net/QUICNetVConnection.cc @@ -629,6 +629,7 @@ QUICNetVConnection::_handle_write_ready() Ptr<IOBufferBlock> udp_payload; quiche_send_info send_info; + struct timespec send_at_hint; ssize_t res; ssize_t written = 0; @@ -642,6 +643,13 @@ QUICNetVConnection::_handle_write_ready() while (written + max_udp_payload_size <= quantum) { res = quiche_conn_send(this->_quiche_con, reinterpret_cast<uint8_t *>(udp_payload->end()) + written, max_udp_payload_size, &send_info); + +#ifdef HAVE_SO_TXTIME + if (written == 0) { + memcpy(&send_at_hint, &send_info.at, sizeof(struct timespec)); + } +#endif + if (res > 0) { written += res; } @@ -655,7 +663,7 @@ QUICNetVConnection::_handle_write_ready() if (static_cast<size_t>(written) > max_udp_payload_size) { segment_size = max_udp_payload_size; } - this->_packet_handler->send_packet(this->_udp_con, this->con.addr, udp_payload, segment_size); + this->_packet_handler->send_packet(this->_udp_con, this->con.addr, udp_payload, segment_size, &send_at_hint); net_activity(this, this_ethread()); } } diff --git a/src/iocore/net/QUICPacketHandler.cc b/src/iocore/net/QUICPacketHandler.cc index b7d9f0f3a9..7ca3af1929 100644 --- a/src/iocore/net/QUICPacketHandler.cc +++ b/src/iocore/net/QUICPacketHandler.cc @@ -72,9 +72,10 @@ QUICPacketHandler::close_connection(QUICNetVConnection *conn) } void -QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size) +QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size, + struct timespec *send_at_hint) { - UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload, segment_size); + UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload, segment_size, send_at_hint); if (is_debug_tag_set(v_debug_tag)) { ip_port_text_buffer ipb; diff --git a/src/iocore/net/UnixUDPNet.cc b/src/iocore/net/UnixUDPNet.cc index 1d926ee7bf..92f44fd0e6 100644 --- a/src/iocore/net/UnixUDPNet.cc +++ b/src/iocore/net/UnixUDPNet.cc @@ -45,6 +45,9 @@ #include "tscore/ink_sock.h" #include <netinet/udp.h> #include "P_UnixNet.h" +#ifdef HAVE_SO_TXTIME +#include <linux/net_tstamp.h> +#endif #ifndef UDP_SEGMENT // This is needed because old glibc may not have the constant even if Kernel supports it. @@ -80,7 +83,8 @@ UDPPacket::new_UDPPacket() } UDPPacket * -UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size) +UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size, + struct timespec *send_at_hint) { UDPPacket *p = udpPacketAllocator.alloc(); @@ -91,6 +95,11 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBuffe ats_ip_copy(&p->to, to); p->p.chain = buf; p->p.segment_size = segment_size; +#ifdef HAVE_SO_TXTIME + if (send_at_hint) { + memcpy(&p->p.send_at, &send_at_hint, sizeof(struct timespec)); + } +#endif return p; } @@ -1208,6 +1217,15 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int s } #endif +#ifdef HAVE_SO_TXTIME + struct sock_txtime sk_txtime; + + sk_txtime.clockid = CLOCK_MONOTONIC; + sk_txtime.flags = 0; + if (setsockopt(fd, SOL_SOCKET, SO_TXTIME, &sk_txtime, sizeof(sk_txtime)) == -1) { + Dbg(dbg_ctl_udpnet, "Failed to setsockopt SO_TXTIME. errno=%d", errno); + } +#endif // If this is a class D address (i.e. multicast address), use REUSEADDR. if (ats_is_ip_multicast(addr)) { if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) { @@ -1423,6 +1441,34 @@ UDPQueue::SendUDPPacket(UDPPacket *p) msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa); msg.msg_namelen = ats_ip_size(p->to); +#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables compiler warning. + struct cmsghdr *cm = nullptr; + uint8_t msg_ctrl[0 +#ifdef SOL_UDP + + CMSG_SPACE(sizeof(uint16_t)) +#endif +#ifdef HAVE_SO_TXTIME + + CMSG_SPACE(sizeof(uint64_t)) +#endif + ]; + memset(msg_ctrl, 0, sizeof(msg_ctrl)); +#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME) + +#ifdef HAVE_SO_TXTIME + if (p->p.send_at.tv_sec > 0) { + msg.msg_control = msg_ctrl; + msg.msg_controllen = CMSG_SPACE(sizeof(uint64_t)); + cm = CMSG_FIRSTHDR(&msg); + + cm->cmsg_level = SOL_SOCKET; + cm->cmsg_type = SCM_TXTIME; + cm->cmsg_len = CMSG_LEN(sizeof(uint64_t)); + + // Convert struct timespec to nanoseconds. + *((uint64_t *)CMSG_DATA(cm)) = p->p.send_at.tv_sec * (1000ULL * 1000 * 1000) + p->p.send_at.tv_nsec; + } +#endif + if (p->p.segment_size > 0) { ink_assert(p->p.chain->next == nullptr); msg.msg_iov = iov; @@ -1436,10 +1482,16 @@ UDPQueue::SendUDPPacket(UDPPacket *p) char buf[CMSG_SPACE(sizeof(uint16_t))]; struct cmsghdr align; } u; - msg.msg_control = u.buf; - msg.msg_controllen = sizeof(u.buf); - struct cmsghdr *cm = CMSG_FIRSTHDR(&msg); + if (cm == nullptr) { + msg.msg_control = msg_ctrl; + msg.msg_controllen = sizeof(u.buf); + cm = CMSG_FIRSTHDR(&msg); + + } else { + msg.msg_controllen += sizeof(u.buf); + cm = CMSG_NXTHDR(&msg, cm); + } cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); @@ -1564,9 +1616,23 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t n) } else { msgvec_size = sizeof(struct mmsghdr) * n * 64; } + #else msgvec_size = sizeof(struct mmsghdr) * n * 64; #endif + +#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables compiler warning. + uint8_t msg_ctrl[0 +#ifdef SOL_UDP + + CMSG_SPACE(sizeof(uint16_t)) +#endif +#ifdef HAVE_SO_TXTIME + + CMSG_SPACE(sizeof(uint64_t)) +#endif + ]; + memset(msg_ctrl, 0, sizeof(msg_ctrl)); +#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME) + // The sizeof(struct msghdr) is 56 bytes or so. It can be too big to stack (alloca). IOBufferBlock *tmp = new_IOBufferBlock(); tmp->alloc(iobuffer_size_to_index(msgvec_size, BUFFER_SIZE_INDEX_1M)); @@ -1592,6 +1658,26 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t n) packet = p[i]; packet->p.conn->lastSentPktStartTime = packet->p.delivery_time; ink_assert(packet->p.conn->getFd() == fd); +#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) + struct cmsghdr *cm = nullptr; +#endif +#ifdef HAVE_SO_TXTIME + if (packet->p.send_at.tv_sec > 0) { // if set? + msg = &msgvec[vlen].msg_hdr; + msg->msg_controllen = CMSG_SPACE(sizeof(uint64_t)); + msg->msg_control = msg_ctrl; + cm = CMSG_FIRSTHDR(msg); + + cm->cmsg_level = SOL_SOCKET; + cm->cmsg_type = SCM_TXTIME; + cm->cmsg_len = CMSG_LEN(sizeof(uint64_t)); + + // Convert struct timespec to nanoseconds. + *((uint64_t *)CMSG_DATA(cm)) = packet->p.send_at.tv_sec * (1000ULL * 1000 * 1000) + packet->p.send_at.tv_nsec; + ; + } +#endif + if (packet->p.segment_size > 0) { // Presumes one big super buffer is given ink_assert(packet->p.chain->next == nullptr); @@ -1602,17 +1688,24 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t n) msg->msg_namelen = ats_ip_size(packet->to); union udp_segment_hdr *u; - u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union udp_segment_hdr))); - msg->msg_control = u->buf; - msg->msg_controllen = sizeof(u->buf); - iov = &iovec[iovec_used++]; - iov_len = 1; - iov->iov_base = packet->p.chain.get()->start(); - iov->iov_len = packet->p.chain.get()->size(); - msg->msg_iov = iov; - msg->msg_iovlen = iov_len; - - struct cmsghdr *cm = CMSG_FIRSTHDR(msg); + u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union udp_segment_hdr))); + + iov = &iovec[iovec_used++]; + iov_len = 1; + iov->iov_base = packet->p.chain.get()->start(); + iov->iov_len = packet->p.chain.get()->size(); + msg->msg_iov = iov; + msg->msg_iovlen = iov_len; + + if (cm == nullptr) { + msg->msg_control = u->buf; + msg->msg_controllen = sizeof(u->buf); + cm = CMSG_FIRSTHDR(msg); + } else { + msg->msg_controllen += sizeof(u->buf); + cm = CMSG_NXTHDR(msg, cm); + } + cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
