This is an automated email from the ASF dual-hosted git repository.

dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 9777b5a88c UDP-Net: Add support for udp pacing if available. (#11330)
9777b5a88c is described below

commit 9777b5a88c32d54f324c21d3b6ece59a280e226f
Author: Damian Meden <[email protected]>
AuthorDate: Thu May 9 22:38:55 2024 +0200

    UDP-Net: Add support for udp pacing if available. (#11330)
    
    This change uses SO_TXTIME to set a hint to schedule transmission
    of specific packets at specific times, we use the hint given by
    the quiche library when quiche_conn_send is invoked.
---
 CMakeLists.txt                       |   9 +++
 include/iocore/net/UDPPacket.h       |   6 +-
 include/tscore/ink_config.h.cmake.in |   1 +
 src/iocore/net/P_QUICPacketHandler.h |   3 +-
 src/iocore/net/QUICNetVConnection.cc |  10 ++-
 src/iocore/net/QUICPacketHandler.cc  |   5 +-
 src/iocore/net/UnixUDPNet.cc         | 123 ++++++++++++++++++++++++++++++-----
 7 files changed, 137 insertions(+), 20 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index e127bab5c2..2eefcebcdb 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -407,6 +407,15 @@ if(TS_USE_HWLOC)
   )
 endif()
 
+check_symbol_exists(SO_TXTIME "sys/socket.h" SO_TXTIME_FOUND)
+set(CMAKE_EXTRA_INCLUDE_FILES "linux/net_tstamp.h")
+check_type_size("struct sock_txtime" STRUCT_SOCK_TXTIME_FOUND)
+unset(CMAKE_EXTRA_INCLUDE_FILES)
+
+if(SO_TXTIME_FOUND AND STRUCT_SOCK_TXTIME_FOUND)
+  set(HAVE_SO_TXTIME TRUE)
+endif()
+
 option(USE_IOURING "Use experimental io_uring (linux only)" 0)
 if(HAVE_IOURING AND USE_IOURING)
   message(STATUS "Using io_uring")
diff --git a/include/iocore/net/UDPPacket.h b/include/iocore/net/UDPPacket.h
index b3a9dac1a2..349e3162e2 100644
--- a/include/iocore/net/UDPPacket.h
+++ b/include/iocore/net/UDPPacket.h
@@ -40,6 +40,9 @@ struct UDPPacketInternal {
 
   int        reqGenerationNum = 0;
   ink_hrtime delivery_time    = 0; // when to deliver packet
+#ifdef HAVE_SO_TXTIME
+  struct timespec send_at;
+#endif
 
   Ptr<IOBufferBlock> chain;
   Continuation      *cont = nullptr; // callback on error
@@ -102,7 +105,8 @@ public:
      @param buf IOBufferBlock chain of data to use
      @param segment_size Segment size
   */
-  static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when, 
Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0);
+  static UDPPacket *new_UDPPacket(struct sockaddr const *to, ink_hrtime when, 
Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0,
+                                  struct timespec *send_at_hint = nullptr);
 
   /**
      Create a new packet to be delivered to application.
diff --git a/include/tscore/ink_config.h.cmake.in 
b/include/tscore/ink_config.h.cmake.in
index 5e33ca19de..3f9d848632 100644
--- a/include/tscore/ink_config.h.cmake.in
+++ b/include/tscore/ink_config.h.cmake.in
@@ -69,6 +69,7 @@
 #cmakedefine01 HAVE_STRSIGNAL
 #cmakedefine01 HAVE_SYSINFO
 #cmakedefine01 HAVE_PRCTL
+#cmakedefine HAVE_SO_TXTIME 1
 
 #cmakedefine01 HAVE_HWLOC_OBJ_PU
 
diff --git a/src/iocore/net/P_QUICPacketHandler.h 
b/src/iocore/net/P_QUICPacketHandler.h
index a825db65e4..9a34866e79 100644
--- a/src/iocore/net/P_QUICPacketHandler.h
+++ b/src/iocore/net/P_QUICPacketHandler.h
@@ -40,7 +40,8 @@ public:
   QUICPacketHandler();
   virtual ~QUICPacketHandler();
 
-  void send_packet(UDPConnection *udp_con, IpEndpoint &addr, 
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0);
+  void send_packet(UDPConnection *udp_con, IpEndpoint &addr, 
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0,
+                   struct timespec *send_at_hint = nullptr);
   void close_connection(QUICNetVConnection *conn);
 
 protected:
diff --git a/src/iocore/net/QUICNetVConnection.cc 
b/src/iocore/net/QUICNetVConnection.cc
index 203eec6dda..a52b9320a7 100644
--- a/src/iocore/net/QUICNetVConnection.cc
+++ b/src/iocore/net/QUICNetVConnection.cc
@@ -670,6 +670,7 @@ QUICNetVConnection::_handle_write_ready()
 
   Ptr<IOBufferBlock> udp_payload;
   quiche_send_info   send_info;
+  struct timespec    send_at_hint;
   ssize_t            res;
   ssize_t            written = 0;
 
@@ -683,6 +684,13 @@ QUICNetVConnection::_handle_write_ready()
   while (written + max_udp_payload_size <= quantum) {
     res = quiche_conn_send(this->_quiche_con, reinterpret_cast<uint8_t 
*>(udp_payload->end()) + written, max_udp_payload_size,
                            &send_info);
+
+#ifdef HAVE_SO_TXTIME
+    if (written == 0) {
+      memcpy(&send_at_hint, &send_info.at, sizeof(struct timespec));
+    }
+#endif
+
     if (res > 0) {
       written += res;
     }
@@ -696,7 +704,7 @@ QUICNetVConnection::_handle_write_ready()
     if (static_cast<size_t>(written) > max_udp_payload_size) {
       segment_size = max_udp_payload_size;
     }
-    this->_packet_handler->send_packet(this->_udp_con, this->con.addr, 
udp_payload, segment_size);
+    this->_packet_handler->send_packet(this->_udp_con, this->con.addr, 
udp_payload, segment_size, &send_at_hint);
     net_activity(this, this_ethread());
   }
 }
diff --git a/src/iocore/net/QUICPacketHandler.cc 
b/src/iocore/net/QUICPacketHandler.cc
index 8576553148..d336fcd57e 100644
--- a/src/iocore/net/QUICPacketHandler.cc
+++ b/src/iocore/net/QUICPacketHandler.cc
@@ -80,9 +80,10 @@ QUICPacketHandler::close_connection(QUICNetVConnection *conn)
 }
 
 void
-QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, 
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size)
+QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, 
Ptr<IOBufferBlock> udp_payload, uint16_t segment_size,
+                               struct timespec *send_at_hint)
 {
-  UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload, 
segment_size);
+  UDPPacket *udp_packet = UDPPacket::new_UDPPacket(addr, 0, udp_payload, 
segment_size, send_at_hint);
 
   if (dbg_ctl_v.on()) {
     ip_port_text_buffer ipb;
diff --git a/src/iocore/net/UnixUDPNet.cc b/src/iocore/net/UnixUDPNet.cc
index 1d926ee7bf..92f44fd0e6 100644
--- a/src/iocore/net/UnixUDPNet.cc
+++ b/src/iocore/net/UnixUDPNet.cc
@@ -45,6 +45,9 @@
 #include "tscore/ink_sock.h"
 #include <netinet/udp.h>
 #include "P_UnixNet.h"
+#ifdef HAVE_SO_TXTIME
+#include <linux/net_tstamp.h>
+#endif
 
 #ifndef UDP_SEGMENT
 // This is needed because old glibc may not have the constant even if Kernel 
supports it.
@@ -80,7 +83,8 @@ UDPPacket::new_UDPPacket()
 }
 
 UDPPacket *
-UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, 
Ptr<IOBufferBlock> &buf, uint16_t segment_size)
+UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, 
Ptr<IOBufferBlock> &buf, uint16_t segment_size,
+                         struct timespec *send_at_hint)
 {
   UDPPacket *p = udpPacketAllocator.alloc();
 
@@ -91,6 +95,11 @@ UDPPacket::new_UDPPacket(struct sockaddr const *to, 
ink_hrtime when, Ptr<IOBuffe
     ats_ip_copy(&p->to, to);
   p->p.chain        = buf;
   p->p.segment_size = segment_size;
+#ifdef HAVE_SO_TXTIME
+  if (send_at_hint) {
+    memcpy(&p->p.send_at, &send_at_hint, sizeof(struct timespec));
+  }
+#endif
   return p;
 }
 
@@ -1208,6 +1217,15 @@ UDPNetProcessor::UDPBind(Continuation *cont, sockaddr 
const *addr, int fd, int s
   }
 #endif
 
+#ifdef HAVE_SO_TXTIME
+  struct sock_txtime sk_txtime;
+
+  sk_txtime.clockid = CLOCK_MONOTONIC;
+  sk_txtime.flags   = 0;
+  if (setsockopt(fd, SOL_SOCKET, SO_TXTIME, &sk_txtime, sizeof(sk_txtime)) == 
-1) {
+    Dbg(dbg_ctl_udpnet, "Failed to setsockopt SO_TXTIME. errno=%d", errno);
+  }
+#endif
   // If this is a class D address (i.e. multicast address), use REUSEADDR.
   if (ats_is_ip_multicast(addr)) {
     if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) < 0) {
@@ -1423,6 +1441,34 @@ UDPQueue::SendUDPPacket(UDPPacket *p)
   msg.msg_name       = reinterpret_cast<caddr_t>(&p->to.sa);
   msg.msg_namelen    = ats_ip_size(p->to);
 
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables 
compiler warning.
+  struct cmsghdr *cm = nullptr;
+  uint8_t         msg_ctrl[0
+#ifdef SOL_UDP
+                   + CMSG_SPACE(sizeof(uint16_t))
+#endif
+#ifdef HAVE_SO_TXTIME
+                   + CMSG_SPACE(sizeof(uint64_t))
+#endif
+  ];
+  memset(msg_ctrl, 0, sizeof(msg_ctrl));
+#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+
+#ifdef HAVE_SO_TXTIME
+  if (p->p.send_at.tv_sec > 0) {
+    msg.msg_control    = msg_ctrl;
+    msg.msg_controllen = CMSG_SPACE(sizeof(uint64_t));
+    cm                 = CMSG_FIRSTHDR(&msg);
+
+    cm->cmsg_level = SOL_SOCKET;
+    cm->cmsg_type  = SCM_TXTIME;
+    cm->cmsg_len   = CMSG_LEN(sizeof(uint64_t));
+
+    // Convert struct timespec to nanoseconds.
+    *((uint64_t *)CMSG_DATA(cm)) = p->p.send_at.tv_sec * (1000ULL * 1000 * 
1000) + p->p.send_at.tv_nsec;
+  }
+#endif
+
   if (p->p.segment_size > 0) {
     ink_assert(p->p.chain->next == nullptr);
     msg.msg_iov    = iov;
@@ -1436,10 +1482,16 @@ UDPQueue::SendUDPPacket(UDPPacket *p)
         char           buf[CMSG_SPACE(sizeof(uint16_t))];
         struct cmsghdr align;
       } u;
-      msg.msg_control    = u.buf;
-      msg.msg_controllen = sizeof(u.buf);
 
-      struct cmsghdr *cm           = CMSG_FIRSTHDR(&msg);
+      if (cm == nullptr) {
+        msg.msg_control    = msg_ctrl;
+        msg.msg_controllen = sizeof(u.buf);
+        cm                 = CMSG_FIRSTHDR(&msg);
+
+      } else {
+        msg.msg_controllen += sizeof(u.buf);
+        cm                  = CMSG_NXTHDR(&msg, cm);
+      }
       cm->cmsg_level               = SOL_UDP;
       cm->cmsg_type                = UDP_SEGMENT;
       cm->cmsg_len                 = CMSG_LEN(sizeof(uint16_t));
@@ -1564,9 +1616,23 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t 
n)
   } else {
     msgvec_size = sizeof(struct mmsghdr) * n * 64;
   }
+
 #else
   msgvec_size = sizeof(struct mmsghdr) * n * 64;
 #endif
+
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME) // to avoid unused variables 
compiler warning.
+  uint8_t msg_ctrl[0
+#ifdef SOL_UDP
+                   + CMSG_SPACE(sizeof(uint16_t))
+#endif
+#ifdef HAVE_SO_TXTIME
+                   + CMSG_SPACE(sizeof(uint64_t))
+#endif
+  ];
+  memset(msg_ctrl, 0, sizeof(msg_ctrl));
+#endif // defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+
   // The sizeof(struct msghdr) is 56 bytes or so. It can be too big to stack 
(alloca).
   IOBufferBlock *tmp = new_IOBufferBlock();
   tmp->alloc(iobuffer_size_to_index(msgvec_size, BUFFER_SIZE_INDEX_1M));
@@ -1592,6 +1658,26 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t 
n)
     packet                               = p[i];
     packet->p.conn->lastSentPktStartTime = packet->p.delivery_time;
     ink_assert(packet->p.conn->getFd() == fd);
+#if defined(SOL_UDP) || defined(HAVE_SO_TXTIME)
+    struct cmsghdr *cm = nullptr;
+#endif
+#ifdef HAVE_SO_TXTIME
+    if (packet->p.send_at.tv_sec > 0) { // if set?
+      msg                 = &msgvec[vlen].msg_hdr;
+      msg->msg_controllen = CMSG_SPACE(sizeof(uint64_t));
+      msg->msg_control    = msg_ctrl;
+      cm                  = CMSG_FIRSTHDR(msg);
+
+      cm->cmsg_level = SOL_SOCKET;
+      cm->cmsg_type  = SCM_TXTIME;
+      cm->cmsg_len   = CMSG_LEN(sizeof(uint64_t));
+
+      // Convert struct timespec to nanoseconds.
+      *((uint64_t *)CMSG_DATA(cm)) = packet->p.send_at.tv_sec * (1000ULL * 
1000 * 1000) + packet->p.send_at.tv_nsec;
+      ;
+    }
+#endif
+
     if (packet->p.segment_size > 0) {
       // Presumes one big super buffer is given
       ink_assert(packet->p.chain->next == nullptr);
@@ -1602,17 +1688,24 @@ UDPQueue::SendMultipleUDPPackets(UDPPacket **p, 
uint16_t n)
         msg->msg_namelen = ats_ip_size(packet->to);
 
         union udp_segment_hdr *u;
-        u                   = static_cast<union udp_segment_hdr 
*>(alloca(sizeof(union udp_segment_hdr)));
-        msg->msg_control    = u->buf;
-        msg->msg_controllen = sizeof(u->buf);
-        iov                 = &iovec[iovec_used++];
-        iov_len             = 1;
-        iov->iov_base       = packet->p.chain.get()->start();
-        iov->iov_len        = packet->p.chain.get()->size();
-        msg->msg_iov        = iov;
-        msg->msg_iovlen     = iov_len;
-
-        struct cmsghdr *cm           = CMSG_FIRSTHDR(msg);
+        u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union 
udp_segment_hdr)));
+
+        iov             = &iovec[iovec_used++];
+        iov_len         = 1;
+        iov->iov_base   = packet->p.chain.get()->start();
+        iov->iov_len    = packet->p.chain.get()->size();
+        msg->msg_iov    = iov;
+        msg->msg_iovlen = iov_len;
+
+        if (cm == nullptr) {
+          msg->msg_control    = u->buf;
+          msg->msg_controllen = sizeof(u->buf);
+          cm                  = CMSG_FIRSTHDR(msg);
+        } else {
+          msg->msg_controllen += sizeof(u->buf);
+          cm                   = CMSG_NXTHDR(msg, cm);
+        }
+
         cm->cmsg_level               = SOL_UDP;
         cm->cmsg_type                = UDP_SEGMENT;
         cm->cmsg_len                 = CMSG_LEN(sizeof(uint16_t));

Reply via email to