This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new de8e2439f combine UDPPacket and UDPPacketInternal (#9424) de8e2439f is described below commit de8e2439fdd4dc491bb7af512b8f8603a81a7be1 Author: Chris McFarlen <ch...@mcfarlen.us> AuthorDate: Wed Mar 29 15:48:30 2023 -0500 combine UDPPacket and UDPPacketInternal (#9424) * combine UDPPacket and UDPPacketInternal * remove UDPPacketInternal from uncompiled code * PR comments * Re-introduce internal class as a private aggregate of udppacket. Include friend declarations for class that need private access --------- Co-authored-by: Chris McFarlen <cmcfar...@apple.com> --- iocore/net/I_UDPConnection.h | 11 +- iocore/net/I_UDPPacket.h | 76 ++++++++++- iocore/net/Makefile.am | 1 - iocore/net/P_QUICNet.h | 6 +- iocore/net/P_UDPConnection.h | 9 -- iocore/net/P_UDPNet.h | 61 ++++----- iocore/net/P_UDPPacket.h | 57 -------- iocore/net/P_UnixUDPConnection.h | 3 +- iocore/net/QUICNet.cc | 10 +- iocore/net/QUICPacketHandler.cc | 2 +- iocore/net/QUICPacketHandler_quiche.cc | 2 +- iocore/net/UnixUDPConnection.cc | 17 ++- iocore/net/UnixUDPNet.cc | 217 ++++++++++++------------------ iocore/net/quic/QUICPacketReceiveQueue.cc | 2 +- 14 files changed, 217 insertions(+), 257 deletions(-) diff --git a/iocore/net/I_UDPConnection.h b/iocore/net/I_UDPConnection.h index a56f5cd5d..a9ae05fe8 100644 --- a/iocore/net/I_UDPConnection.h +++ b/iocore/net/I_UDPConnection.h @@ -71,7 +71,7 @@ public: <b>Callbacks:</b><br> cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, UDPConnection *) on error <br> - cont->handleEvent(NET_EVENT_DATAGRAM_READ_READY, Queue<UDPPacketInternal> *) on incoming packets. + cont->handleEvent(NET_EVENT_DATAGRAM_READ_READY, Queue<UDPPacket> *) on incoming packets. @return Action* Always returns nullptr. Can't be cancelled via this Action. @@ -100,6 +100,15 @@ public: void bindToThread(Continuation *c, EThread *t); virtual void UDPConnection_is_abstract() = 0; + + // this is for doing packet scheduling: we keep two values so that we can + // implement cancel. The first value tracks the startTime of the last + // packet that was sent on this connection; the second value tracks the + // startTime of the last packet when we are doing scheduling; whenever the + // associated continuation cancels a packet, we rest lastPktStartTime to be + // the same as the lastSentPktStartTime. + uint64_t lastSentPktStartTime = 0; + uint64_t lastPktStartTime = 0; }; extern UDPConnection *new_UDPConnection(int fd); diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index 59c7aa28c..d3705ed7f 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -32,6 +32,23 @@ #pragma once #include "I_UDPConnection.h" + +struct UDPPacketInternal { + // packet scheduling stuff: keep it a doubly linked list + uint64_t pktLength = 0; + uint16_t segment_size = 0; + + int reqGenerationNum = 0; + ink_hrtime delivery_time = 0; // when to deliver packet + + Ptr<IOBufferBlock> chain; + Continuation *cont = nullptr; // callback on error + UDPConnection *conn = nullptr; // connection where packet should be sent to. + + int in_the_priority_queue = 0; + int in_heap = 0; +}; + /** @name UDPPacket UDP packet functions used by UDPConnection */ @@ -41,14 +58,21 @@ */ class UDPPacket { + friend class UDPQueue; + friend class PacketQueue; + friend class UDPConnection; + friend class UnixUDPConnection; + public: - virtual ~UDPPacket() {} - virtual void free(); // fast deallocate + UDPPacket(); + ~UDPPacket(); + void free(); // fast deallocate + void setContinuation(Continuation *c); void setConnection(UDPConnection *c); UDPConnection *getConnection(); IOBufferBlock *getIOBlockChain(); - int64_t getPktLength() const; + int64_t getPktLength(); /** Add IOBufferBlock (chain) to end of packet. @@ -62,7 +86,6 @@ public: int from_size; LINK(UDPPacket, link); - // Factory (static) methods /** @@ -87,4 +110,49 @@ public: Internal function only */ static UDPPacket *new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> &block); + +private: + SLINK(UDPPacket, alink); // atomic link + UDPPacketInternal p; }; + +// Inline definitions + +inline void +UDPPacket::setContinuation(Continuation *c) +{ + p.cont = c; +} + +inline void +UDPPacket::setConnection(UDPConnection *c) +{ + /*Code reviewed by Case Larsen. Previously, we just had + ink_assert(!conn). This prevents tunneling of packets + correctly---that is, you get packets from a server on a udp + conn. and want to send it to a player on another connection, the + assert will prevent that. The "if" clause enables correct + handling of the connection ref. counts in such a scenario. */ + + if (p.conn) { + if (p.conn == c) { + return; + } + p.conn->Release(); + p.conn = nullptr; + } + p.conn = c; + p.conn->AddRef(); +} + +inline UDPConnection * +UDPPacket::getConnection() +{ + return p.conn; +} + +inline IOBufferBlock * +UDPPacket::getIOBlockChain() +{ + return p.chain.get(); +} diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index 1effafad5..2e9e800f1 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -164,7 +164,6 @@ libinknet_a_SOURCES = \ P_UDPConnection.h \ P_UDPIOEvent.h \ P_UDPNet.h \ - P_UDPPacket.h \ P_UnixCompletionUtil.h \ P_UnixNet.h \ P_UnixNetProcessor.h \ diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h index 83f2795a5..0f0eb3ed0 100644 --- a/iocore/net/P_QUICNet.h +++ b/iocore/net/P_QUICNet.h @@ -39,8 +39,8 @@ void initialize_thread_for_quic_net(EThread *thread); struct QUICPollEvent { QUICConnection *con; - UDPPacketInternal *packet; - void init(QUICConnection *con, UDPPacketInternal *packet); + UDPPacket *packet; + void init(QUICConnection *con, UDPPacket *packet); void free(); SLINK(QUICPollEvent, alink); @@ -62,7 +62,7 @@ public: private: // Internal Queue to save Long Header Packet - Que(UDPPacketInternal, link) _longInQueue; + Que(UDPPacket, link) _longInQueue; private: #if TS_HAS_QUICHE diff --git a/iocore/net/P_UDPConnection.h b/iocore/net/P_UDPConnection.h index 95aedbcc8..5c7e38e83 100644 --- a/iocore/net/P_UDPConnection.h +++ b/iocore/net/P_UDPConnection.h @@ -47,15 +47,6 @@ public: bool binding_valid = false; int tobedestroyed = 0; int sendGenerationNum = 0; - - // this is for doing packet scheduling: we keep two values so that we can - // implement cancel. The first value tracks the startTime of the last - // packet that was sent on this connection; the second value tracks the - // startTime of the last packet when we are doing scheduling; whenever the - // associated continuation cancels a packet, we rest lastPktStartTime to be - // the same as the lastSentPktStartTime. - uint64_t lastSentPktStartTime = 0; - uint64_t lastPktStartTime = 0; }; TS_INLINE diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index 5f82d348f..d1d862b84 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -73,8 +73,8 @@ public: virtual ~PacketQueue() {} int nPackets = 0; ink_hrtime lastPullLongTermQ = 0; - Queue<UDPPacketInternal> longTermQ; - Queue<UDPPacketInternal> bucket[N_SLOTS]; + Queue<UDPPacket> longTermQ; + Queue<UDPPacket> bucket[N_SLOTS]; ink_hrtime delivery_time[N_SLOTS]; int now_slot = 0; @@ -93,7 +93,7 @@ public: } void - addPacket(UDPPacketInternal *e, ink_hrtime now = 0) + addPacket(UDPPacket *e, ink_hrtime now = 0) { int before = 0; int slot; @@ -107,10 +107,10 @@ public: ink_assert(delivery_time[now_slot]); - if (e->delivery_time < now) - e->delivery_time = now; + if (e->p.delivery_time < now) + e->p.delivery_time = now; - ink_hrtime s = e->delivery_time - delivery_time[now_slot]; + ink_hrtime s = e->p.delivery_time - delivery_time[now_slot]; if (s < 0) { before = 1; @@ -123,20 +123,21 @@ public: // from long-term slot whenever you advance. if (s >= N_SLOTS - 1) { longTermQ.enqueue(e); - e->in_heap = 0; - e->in_the_priority_queue = 1; + e->p.in_heap = 0; + e->p.in_the_priority_queue = 1; return; } slot = (s + now_slot) % N_SLOTS; // so that slot+1 is still "in future". - ink_assert((before || delivery_time[slot] <= e->delivery_time) && (delivery_time[(slot + 1) % N_SLOTS] >= e->delivery_time)); - e->in_the_priority_queue = 1; - e->in_heap = slot; + ink_assert((before || delivery_time[slot] <= e->p.delivery_time) && + (delivery_time[(slot + 1) % N_SLOTS] >= e->p.delivery_time)); + e->p.in_the_priority_queue = 1; + e->p.in_heap = slot; bucket[slot].enqueue(e); } - UDPPacketInternal * + UDPPacket * firstPacket(ink_hrtime t) { if (t > delivery_time[now_slot]) { @@ -146,7 +147,7 @@ public: } } - UDPPacketInternal * + UDPPacket * getFirstPacket() { nPackets--; @@ -161,21 +162,21 @@ public: } bool - IsCancelledPacket(UDPPacketInternal *p) + IsCancelledPacket(UDPPacket *p) { // discard packets that'll never get sent... - return ((p->conn->shouldDestroy()) || (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)); + return ((p->p.conn->shouldDestroy()) || (p->p.conn->GetSendGenerationNumber() != p->p.reqGenerationNum)); } void FreeCancelledPackets(int numSlots) { - Queue<UDPPacketInternal> tempQ; + Queue<UDPPacket> tempQ; int i; for (i = 0; i < numSlots; i++) { int s = (now_slot + i) % N_SLOTS; - UDPPacketInternal *p; + UDPPacket *p; while (nullptr != (p = bucket[s].dequeue())) { if (IsCancelledPacket(p)) { p->free(); @@ -196,8 +197,8 @@ public: int s = now_slot; if (ink_hrtime_to_msec(t - lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) { - Queue<UDPPacketInternal> tempQ; - UDPPacketInternal *p; + Queue<UDPPacket> tempQ; + UDPPacket *p; // pull in all the stuff from long-term slot lastPullLongTermQ = t; // this is to handle weirdness where someone is trying to queue a @@ -233,23 +234,23 @@ public: private: void - remove(UDPPacketInternal *e) + remove(UDPPacket *e) { nPackets--; - ink_assert(e->in_the_priority_queue); - e->in_the_priority_queue = 0; - bucket[e->in_heap].remove(e); + ink_assert(e->p.in_the_priority_queue); + e->p.in_the_priority_queue = 0; + bucket[e->p.in_heap].remove(e); } public: - UDPPacketInternal * + UDPPacket * dequeue_ready(ink_hrtime t) { (void)t; - UDPPacketInternal *e = bucket[now_slot].dequeue(); + UDPPacket *e = bucket[now_slot].dequeue(); if (e) { - ink_assert(e->in_the_priority_queue); - e->in_the_priority_queue = 0; + ink_assert(e->p.in_the_priority_queue); + e->p.in_the_priority_queue = 0; } advanceNow(t); return e; @@ -288,13 +289,13 @@ class UDPQueue public: // Outgoing UDP Packet Queue - ASLL(UDPPacketInternal, alink) outQueue; + ASLL(UDPPacket, alink) outQueue; void service(UDPNetHandler *); void SendPackets(); - void SendUDPPacket(UDPPacketInternal *p); - int SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n); + void SendUDPPacket(UDPPacket *p); + int SendMultipleUDPPackets(UDPPacket **p, uint16_t n); // Interface exported to the outside world void send(UDPPacket *p); diff --git a/iocore/net/P_UDPPacket.h b/iocore/net/P_UDPPacket.h deleted file mode 100644 index 4f4caeee9..000000000 --- a/iocore/net/P_UDPPacket.h +++ /dev/null @@ -1,57 +0,0 @@ -/** @file - - A brief file description - - @section license License - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -/**************************************************************************** - - P_UDPPacket.h - Implementation of UDPPacket - - ****************************************************************************/ - -#pragma once - -#include "I_UDPNet.h" - -class UDPPacketInternal : public UDPPacket -{ -public: - UDPPacketInternal(); - ~UDPPacketInternal() override; - - void free() override; - - SLINK(UDPPacketInternal, alink); // atomic link - // packet scheduling stuff: keep it a doubly linked list - uint64_t pktLength = 0; - uint16_t segment_size = 0; - - int reqGenerationNum = 0; - ink_hrtime delivery_time = 0; // when to deliver packet - - Ptr<IOBufferBlock> chain; - Continuation *cont = nullptr; // callback on error - UDPConnectionInternal *conn = nullptr; // connection where packet should be sent to. - - int in_the_priority_queue = 0; - int in_heap = 0; -}; diff --git a/iocore/net/P_UnixUDPConnection.h b/iocore/net/P_UnixUDPConnection.h index 62b47f6dd..abed93ed3 100644 --- a/iocore/net/P_UnixUDPConnection.h +++ b/iocore/net/P_UnixUDPConnection.h @@ -31,7 +31,6 @@ #pragma once #include "P_UDPConnection.h" -#include "P_UDPPacket.h" class UnixUDPConnection : public UDPConnectionInternal { @@ -45,7 +44,7 @@ public: LINK(UnixUDPConnection, callback_link); // Incoming UDP Packet Queue - ASLL(UDPPacketInternal, alink) inQueue; + ASLL(UDPPacket, alink) inQueue; int onCallbackQueue = 0; Action *callbackAction = nullptr; EThread *ethread = nullptr; diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc index 4d49f2dea..4d3ac6667 100644 --- a/iocore/net/QUICNet.cc +++ b/iocore/net/QUICNet.cc @@ -28,7 +28,7 @@ ClassAllocator<QUICPollEvent> quicPollEventAllocator("quicPollEvent"); void -QUICPollEvent::init(QUICConnection *con, UDPPacketInternal *packet) +QUICPollEvent::init(QUICConnection *con, UDPPacket *packet) { this->con = con; this->packet = packet; @@ -64,7 +64,7 @@ QUICPollCont::~QUICPollCont() {} void QUICPollCont::_process_packet(QUICPollEvent *e, NetHandler *nh) { - UDPPacketInternal *p = e->packet; + UDPPacket *p = e->packet; QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(e->con); vc->read.triggered = 1; @@ -84,7 +84,7 @@ QUICPollCont::_process_packet(QUICPollEvent *e, NetHandler *nh) void QUICPollCont::_process_long_header_packet(QUICPollEvent *e, NetHandler *nh) { - UDPPacketInternal *p = e->packet; + UDPPacket *p = e->packet; // FIXME: VC is nullptr ? QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(e->con); uint8_t *buf = reinterpret_cast<uint8_t *>(p->getIOBlockChain()->buf()); @@ -124,7 +124,7 @@ QUICPollCont::_process_long_header_packet(QUICPollEvent *e, NetHandler *nh) void QUICPollCont::_process_short_header_packet(QUICPollEvent *e, NetHandler *nh) { - UDPPacketInternal *p = e->packet; + UDPPacket *p = e->packet; QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(e->con); vc->read.triggered = 1; @@ -158,7 +158,7 @@ QUICPollCont::pollEvent(int, Event *) Queue<QUICPollEvent> result; while ((e = aq.pop())) { QUICNetVConnection *qvc = static_cast<QUICNetVConnection *>(e->con); - UDPPacketInternal *p = e->packet; + UDPPacket *p = e->packet; if (qvc != nullptr && qvc->in_closed_queue) { p->free(); e->free(); diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index e44b388a2..25321fd36 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -391,7 +391,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) } QUICPollEvent *qe = quicPollEventAllocator.alloc(); - qe->init(qc, static_cast<UDPPacketInternal *>(udp_packet)); + qe->init(qc, static_cast<UDPPacket *>(udp_packet)); // Push the packet into QUICPollCont get_QUICPollCont(eth)->inQueue.push(qe); get_NetHandler(eth)->signalActivity(); diff --git a/iocore/net/QUICPacketHandler_quiche.cc b/iocore/net/QUICPacketHandler_quiche.cc index 34aa812e1..cc6b7d112 100644 --- a/iocore/net/QUICPacketHandler_quiche.cc +++ b/iocore/net/QUICPacketHandler_quiche.cc @@ -320,7 +320,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) eth = vc->thread; QUICPollEvent *qe = quicPollEventAllocator.alloc(); - qe->init(qc, static_cast<UDPPacketInternal *>(udp_packet)); + qe->init(qc, static_cast<UDPPacket *>(udp_packet)); // Push the packet into QUICPollCont get_QUICPollCont(eth)->inQueue.push(qe); get_NetHandler(eth)->signalActivity(); diff --git a/iocore/net/UnixUDPConnection.cc b/iocore/net/UnixUDPConnection.cc index 99d3d125f..92b9e6023 100644 --- a/iocore/net/UnixUDPConnection.cc +++ b/iocore/net/UnixUDPConnection.cc @@ -34,9 +34,9 @@ UnixUDPConnection::~UnixUDPConnection() { - UDPPacketInternal *p = nullptr; + UDPPacket *p = nullptr; - SList(UDPPacketInternal, alink) aq(inQueue.popall()); + SList(UDPPacket, alink) aq(inQueue.popall()); if (!tobedestroyed) { tobedestroyed = 1; @@ -76,11 +76,11 @@ UnixUDPConnection::callbackHandler(int event, void *data) Release(); return EVENT_CONT; } else { - UDPPacketInternal *p = nullptr; - SList(UDPPacketInternal, alink) aq(inQueue.popall()); + UDPPacket *p = nullptr; + SList(UDPPacket, alink) aq(inQueue.popall()); Debug("udpnet", "UDPConnection::callbackHandler"); - Queue<UDPPacketInternal> result; + Queue<UDPPacket> result; while ((p = aq.pop())) { result.push(p); } @@ -111,9 +111,8 @@ UDPConnection::bindToThread(Continuation *c, EThread *t) } Action * -UDPConnection::send(Continuation *c, UDPPacket *xp) +UDPConnection::send(Continuation *c, UDPPacket *p) { - UDPPacketInternal *p = (UDPPacketInternal *)xp; UnixUDPConnection *conn = (UnixUDPConnection *)this; if (shouldDestroy()) { @@ -127,8 +126,8 @@ UDPConnection::send(Continuation *c, UDPPacket *xp) p->setConnection(this); conn->continuation = c; ink_assert(conn->continuation != nullptr); - mutex = c->mutex; - p->reqGenerationNum = conn->sendGenerationNum; + mutex = c->mutex; + p->p.reqGenerationNum = conn->sendGenerationNum; get_UDPNetHandler(conn->ethread)->udpOutQueue.send(p); return nullptr; } diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index 60f6659e8..44db9494a 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -45,146 +45,96 @@ using UDPNetContHandler = int (UDPNetHandler::*)(int, void *); -ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator"); +ClassAllocator<UDPPacket> udpPacketAllocator("udpPacketAllocator"); EventType ET_UDP; -void -UDPPacketInternal::free() -{ - chain = nullptr; - if (conn) - conn->Release(); - conn = nullptr; - udpPacketAllocator.free(this); -} - UDPPacket * UDPPacket::new_UDPPacket() { - UDPPacketInternal *p = udpPacketAllocator.alloc(); - return p; + return udpPacketAllocator.alloc(); } UDPPacket * UDPPacket::new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size) { - UDPPacketInternal *p = udpPacketAllocator.alloc(); + UDPPacket *p = udpPacketAllocator.alloc(); - p->in_the_priority_queue = 0; - p->in_heap = 0; - p->delivery_time = when; + p->p.in_the_priority_queue = 0; + p->p.in_heap = 0; + p->p.delivery_time = when; if (to) ats_ip_copy(&p->to, to); - p->chain = buf; - p->segment_size = segment_size; + p->p.chain = buf; + p->p.segment_size = segment_size; return p; } UDPPacket * UDPPacket::new_incoming_UDPPacket(struct sockaddr *from, struct sockaddr *to, Ptr<IOBufferBlock> &block) { - UDPPacketInternal *p = udpPacketAllocator.alloc(); + UDPPacket *p = udpPacketAllocator.alloc(); - p->in_the_priority_queue = 0; - p->in_heap = 0; - p->delivery_time = 0; + p->p.in_the_priority_queue = 0; + p->p.in_heap = 0; + p->p.delivery_time = 0; ats_ip_copy(&p->from, from); ats_ip_copy(&p->to, to); - p->chain = block; + p->p.chain = block; return p; } -UDPPacketInternal::UDPPacketInternal() +UDPPacket::UDPPacket() { memset(&from, '\0', sizeof(from)); memset(&to, '\0', sizeof(to)); } -UDPPacketInternal::~UDPPacketInternal() +UDPPacket::~UDPPacket() { - chain = nullptr; + p.chain = nullptr; } void UDPPacket::append_block(IOBufferBlock *block) { - UDPPacketInternal *p = static_cast<UDPPacketInternal *>(this); - if (block) { - if (p->chain) { // append to end - IOBufferBlock *last = p->chain.get(); + if (p.chain) { // append to end + IOBufferBlock *last = p.chain.get(); while (last->next) { last = last->next.get(); } last->next = block; } else { - p->chain = block; + p.chain = block; } } } int64_t -UDPPacket::getPktLength() const +UDPPacket::getPktLength() { - UDPPacketInternal *p = const_cast<UDPPacketInternal *>(static_cast<const UDPPacketInternal *>(this)); + UDPPacket *pkt = this; IOBufferBlock *b; - p->pktLength = 0; - b = p->chain.get(); + pkt->p.pktLength = 0; + b = pkt->p.chain.get(); while (b) { - p->pktLength += b->read_avail(); - b = b->next.get(); + pkt->p.pktLength += b->read_avail(); + b = b->next.get(); } - return p->pktLength; + return pkt->p.pktLength; } void UDPPacket::free() { - static_cast<UDPPacketInternal *>(this)->free(); -} - -void -UDPPacket::setContinuation(Continuation *c) -{ - static_cast<UDPPacketInternal *>(this)->cont = c; -} - -void -UDPPacket::setConnection(UDPConnection *c) -{ - /*Code reviewed by Case Larsen. Previously, we just had - ink_assert(!conn). This prevents tunneling of packets - correctly---that is, you get packets from a server on a udp - conn. and want to send it to a player on another connection, the - assert will prevent that. The "if" clause enables correct - handling of the connection ref. counts in such a scenario. */ - - UDPConnectionInternal *&conn = static_cast<UDPPacketInternal *>(this)->conn; - - if (conn) { - if (conn == c) - return; - conn->Release(); - conn = nullptr; - } - conn = static_cast<UDPConnectionInternal *>(c); - conn->AddRef(); -} - -IOBufferBlock * -UDPPacket::getIOBlockChain() -{ - ink_assert(dynamic_cast<UDPPacketInternal *>(this) != nullptr); - return static_cast<UDPPacketInternal *>(this)->chain.get(); -} - -UDPConnection * -UDPPacket::getConnection() -{ - return static_cast<UDPPacketInternal *>(this)->conn; + p.chain = nullptr; + if (p.conn) + p.conn->Release(); + p.conn = nullptr; + udpPacketAllocator.free(this); } // @@ -393,7 +343,7 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc UDPPacket *p = UDPPacket::new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain); p->setConnection(uc); // queue onto the UDPConnection - uc->inQueue.push((UDPPacketInternal *)p); + uc->inQueue.push(p); // reload the unused block chain = next_chain; @@ -1038,10 +988,10 @@ UDPQueue::service(UDPNetHandler *nh) uint64_t timeSpent = 0; uint64_t pktSendStartTime; ink_hrtime pktSendTime; - UDPPacketInternal *p = nullptr; + UDPPacket *p = nullptr; - SList(UDPPacketInternal, alink) aq(outQueue.popall()); - Queue<UDPPacketInternal> stk; + SList(UDPPacket, alink) aq(outQueue.popall()); + Queue<UDPPacket> stk; while ((p = aq.pop())) { stk.push(p); } @@ -1052,14 +1002,14 @@ UDPQueue::service(UDPNetHandler *nh) ink_assert(p->link.next == nullptr); // insert into our queue. Debug("udp-send", "Adding %p", p); - if (p->conn->lastPktStartTime == 0) { - pktSendStartTime = std::max(now, p->delivery_time); + if (p->p.conn->lastPktStartTime == 0) { + pktSendStartTime = std::max(now, p->p.delivery_time); } else { - pktSendTime = p->delivery_time; - pktSendStartTime = std::max(std::max(now, pktSendTime), p->delivery_time); + pktSendTime = p->p.delivery_time; + pktSendStartTime = std::max(std::max(now, pktSendTime), p->p.delivery_time); } - p->conn->lastPktStartTime = pktSendStartTime; - p->delivery_time = pktSendStartTime; + p->p.conn->lastPktStartTime = pktSendStartTime; + p->p.delivery_time = pktSendStartTime; pipeInfo.addPacket(p, now); } @@ -1079,7 +1029,7 @@ UDPQueue::service(UDPNetHandler *nh) void UDPQueue::SendPackets() { - UDPPacketInternal *p; + UDPPacket *p; static ink_hrtime lastCleanupTime = Thread::get_hrtime_updated(); ink_hrtime now = Thread::get_hrtime_updated(); ink_hrtime send_threshold_time = now + SLOT_TIME; @@ -1094,7 +1044,7 @@ UDPQueue::SendPackets() #else constexpr int N_MAX_PACKETS = 1024; #endif - UDPPacketInternal *packets[N_MAX_PACKETS]; + UDPPacket *packets[N_MAX_PACKETS]; int nsent; int npackets; @@ -1107,10 +1057,10 @@ sendPackets: p = pipeInfo.getFirstPacket(); pktLen = p->getPktLength(); - if (p->conn->shouldDestroy()) { + if (p->p.conn->shouldDestroy()) { goto next_pkt; } - if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) { + if (p->p.conn->GetSendGenerationNumber() != p->p.reqGenerationNum) { goto next_pkt; } @@ -1148,13 +1098,13 @@ sendPackets: } void -UDPQueue::SendUDPPacket(UDPPacketInternal *p) +UDPQueue::SendUDPPacket(UDPPacket *p) { struct msghdr msg; struct iovec iov[32]; int n, count = 0; - p->conn->lastSentPktStartTime = p->delivery_time; + p->p.conn->lastSentPktStartTime = p->p.delivery_time; Debug("udp-send", "Sending %p", p); msg.msg_control = nullptr; @@ -1163,14 +1113,14 @@ UDPQueue::SendUDPPacket(UDPPacketInternal *p) msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa); msg.msg_namelen = ats_ip_size(p->to); - if (p->segment_size > 0) { - ink_assert(p->chain->next == nullptr); + if (p->p.segment_size > 0) { + ink_assert(p->p.chain->next == nullptr); msg.msg_iov = iov; msg.msg_iovlen = 1; #ifdef SOL_UDP if (use_udp_gso) { - iov[0].iov_base = p->chain.get()->start(); - iov[0].iov_len = p->chain.get()->size(); + iov[0].iov_base = p->p.chain.get()->start(); + iov[0].iov_len = p->p.chain.get()->size(); union udp_segment_hdr { char buf[CMSG_SPACE(sizeof(uint16_t))]; @@ -1183,12 +1133,12 @@ UDPQueue::SendUDPPacket(UDPPacketInternal *p) cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); - *((uint16_t *)CMSG_DATA(cm)) = p->segment_size; + *((uint16_t *)CMSG_DATA(cm)) = p->p.segment_size; count = 0; while (true) { // stupid Linux problem: sendmsg can return EAGAIN - n = ::sendmsg(p->conn->getFd(), &msg, 0); + n = ::sendmsg(p->p.conn->getFd(), &msg, 0); if (n >= 0) { break; } @@ -1214,14 +1164,15 @@ UDPQueue::SendUDPPacket(UDPPacketInternal *p) #endif // Send segments seprately if UDP_SEGMENT is not supported int offset = 0; - while (offset < p->chain.get()->size()) { - iov[0].iov_base = p->chain.get()->start() + offset; - iov[0].iov_len = std::min(static_cast<long>(p->segment_size), p->chain.get()->end() - static_cast<char *>(iov[0].iov_base)); + while (offset < p->p.chain.get()->size()) { + iov[0].iov_base = p->p.chain.get()->start() + offset; + iov[0].iov_len = + std::min(static_cast<long>(p->p.segment_size), p->p.chain.get()->end() - static_cast<char *>(iov[0].iov_base)); count = 0; while (true) { // stupid Linux problem: sendmsg can return EAGAIN - n = ::sendmsg(p->conn->getFd(), &msg, 0); + n = ::sendmsg(p->p.conn->getFd(), &msg, 0); if (n >= 0) { break; } @@ -1240,14 +1191,14 @@ UDPQueue::SendUDPPacket(UDPPacketInternal *p) offset += iov[0].iov_len; } - ink_assert(offset == p->chain.get()->size()); + ink_assert(offset == p->p.chain.get()->size()); #ifdef SOL_UDP } // use_udp_segment #endif } else { // Nothing is special int iov_len = 0; - for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { + for (IOBufferBlock *b = p->p.chain.get(); b != nullptr; b = b->next.get()) { iov[iov_len].iov_base = static_cast<caddr_t>(b->start()); iov[iov_len].iov_len = b->size(); iov_len++; @@ -1258,7 +1209,7 @@ UDPQueue::SendUDPPacket(UDPPacketInternal *p) count = 0; while (true) { // stupid Linux problem: sendmsg can return EAGAIN - n = ::sendmsg(p->conn->getFd(), &msg, 0); + n = ::sendmsg(p->p.conn->getFd(), &msg, 0); if ((n >= 0) || (errno != EAGAIN)) { // send succeeded or some random error happened. if (n < 0) { @@ -1283,11 +1234,11 @@ void UDPQueue::send(UDPPacket *p) { // XXX: maybe fastpath for immediate send? - outQueue.push((UDPPacketInternal *)p); + outQueue.push(p); } int -UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) +UDPQueue::SendMultipleUDPPackets(UDPPacket **p, uint16_t n) { #ifdef HAVE_SENDMMSG struct mmsghdr *msgvec; @@ -1321,19 +1272,19 @@ UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) int iovec_used = 0; int vlen = 0; - int fd = p[0]->conn->getFd(); + int fd = p[0]->p.conn->getFd(); for (int i = 0; i < n; ++i) { - UDPPacketInternal *packet; + UDPPacket *packet; struct msghdr *msg; struct iovec *iov; int iov_len; - packet = p[i]; - packet->conn->lastSentPktStartTime = packet->delivery_time; - ink_assert(packet->conn->getFd() == fd); - if (packet->segment_size > 0) { + packet = p[i]; + packet->p.conn->lastSentPktStartTime = packet->p.delivery_time; + ink_assert(packet->p.conn->getFd() == fd); + if (packet->p.segment_size > 0) { // Presumes one big super buffer is given - ink_assert(packet->chain->next == nullptr); + ink_assert(packet->p.chain->next == nullptr); #ifdef SOL_UDP if (use_udp_gso) { msg = &msgvec[vlen].msg_hdr; @@ -1346,8 +1297,8 @@ UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) msg->msg_controllen = sizeof(u->buf); iov = &iovec[iovec_used++]; iov_len = 1; - iov->iov_base = packet->chain.get()->start(); - iov->iov_len = packet->chain.get()->size(); + iov->iov_base = packet->p.chain.get()->start(); + iov->iov_len = packet->p.chain.get()->size(); msg->msg_iov = iov; msg->msg_iovlen = iov_len; @@ -1355,28 +1306,28 @@ UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) cm->cmsg_level = SOL_UDP; cm->cmsg_type = UDP_SEGMENT; cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); - *((uint16_t *)CMSG_DATA(cm)) = packet->segment_size; + *((uint16_t *)CMSG_DATA(cm)) = packet->p.segment_size; vlen++; } else { #endif // UDP_SEGMENT is unavailable // Send the given data as multiple messages int offset = 0; - while (offset < packet->chain.get()->size()) { + while (offset < packet->p.chain.get()->size()) { msg = &msgvec[vlen].msg_hdr; msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa); msg->msg_namelen = ats_ip_size(packet->to); iov = &iovec[iovec_used++]; iov_len = 1; - iov->iov_base = packet->chain.get()->start() + offset; - iov->iov_len = - std::min(packet->segment_size, static_cast<uint16_t>(packet->chain.get()->end() - static_cast<char *>(iov->iov_base))); - msg->msg_iov = iov; - msg->msg_iovlen = iov_len; - offset += iov->iov_len; + iov->iov_base = packet->p.chain.get()->start() + offset; + iov->iov_len = std::min(packet->p.segment_size, + static_cast<uint16_t>(packet->p.chain.get()->end() - static_cast<char *>(iov->iov_base))); + msg->msg_iov = iov; + msg->msg_iovlen = iov_len; + offset += iov->iov_len; vlen++; } - ink_assert(offset == packet->chain.get()->size()); + ink_assert(offset == packet->p.chain.get()->size()); #ifdef SOL_UDP } // use_udp_gso #endif @@ -1387,7 +1338,7 @@ UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) msg->msg_namelen = ats_ip_size(packet->to); iov = &iovec[iovec_used++]; iov_len = 0; - for (IOBufferBlock *b = packet->chain.get(); b != nullptr; b = b->next.get()) { + for (IOBufferBlock *b = packet->p.chain.get(); b != nullptr; b = b->next.get()) { iov[iov_len].iov_base = static_cast<caddr_t>(b->start()); iov[iov_len].iov_len = b->size(); iov_len++; @@ -1429,10 +1380,10 @@ UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) int i = 0; int nmsg = res; for (i = 0; i < n && res > 0; ++i) { - if (p[i]->segment_size == 0) { + if (p[i]->p.segment_size == 0) { res -= 1; } else { - res -= (p[i]->chain.get()->size() / p[i]->segment_size) + ((p[i]->chain.get()->size() % p[i]->segment_size) != 0); + res -= (p[i]->p.chain.get()->size() / p[i]->p.segment_size) + ((p[i]->p.chain.get()->size() % p[i]->p.segment_size) != 0); } } Debug("udp-send", "Sent %d messages by processing %d UDPPackets", nmsg, i); diff --git a/iocore/net/quic/QUICPacketReceiveQueue.cc b/iocore/net/quic/QUICPacketReceiveQueue.cc index a3594519b..3394ba9bd 100644 --- a/iocore/net/quic/QUICPacketReceiveQueue.cc +++ b/iocore/net/quic/QUICPacketReceiveQueue.cc @@ -27,7 +27,7 @@ #include "QUICIntUtil.h" #include "P_UDPConnection.h" -#include "P_UDPPacket.h" +#include "I_UDPPacket.h" static bool is_vn(QUICVersion v)