TS-1067 More cleanup of unused code
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/7a57cd36 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/7a57cd36 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/7a57cd36 Branch: refs/heads/master Commit: 7a57cd365b412a3043f7ccc12410860e2cc4d357 Parents: 02bd9a7 Author: Leif Hedstrom <[email protected]> Authored: Tue Mar 26 21:59:11 2013 -0700 Committer: Leif Hedstrom <[email protected]> Committed: Tue Apr 2 13:53:42 2013 -0600 ---------------------------------------------------------------------- iocore/net/I_UDPPacket.h | 15 ++-- iocore/net/P_UDPNet.h | 4 - iocore/net/P_UDPPacket.h | 35 +-------- iocore/net/UnixUDPNet.cc | 158 ++++++++++++++++------------------------- 4 files changed, 70 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/I_UDPPacket.h ---------------------------------------------------------------------- diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index 09826dc..08dfa88 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -45,24 +45,22 @@ class UDPPacket public: - virtual ~ UDPPacket() - { - }; - virtual void free(); // fast deallocate + virtual ~UDPPacket() + { } + virtual void free(); // fast deallocate void setContinuation(Continuation * c); void setConnection(UDPConnection * c); UDPConnection *getConnection(); - void setArrivalTime(ink_hrtime t); IOBufferBlock *getIOBlockChain(); int64_t getPktLength(); + /** Add IOBufferBlock (chain) to end of packet. @param block block chain to add. */ inkcoreapi void append_block(IOBufferBlock * block); - virtual void UDPPacket_is_abstract() = 0; IpEndpoint from; // what address came from IpEndpoint to; // what address to send to @@ -94,8 +92,9 @@ extern UDPPacket *new_UDPPacket(struct sockaddr const* to, ink_hrtime when = 0, for packet @param len # of bytes to reference from block */ -extern UDPPacket *new_UDPPacket(struct sockaddr const* to, - ink_hrtime when = 0, IOBufferBlock * block = NULL, int len = 0); + +TS_INLINE UDPPacket *new_UDPPacket(struct sockaddr const* to, ink_hrtime when = 0, + IOBufferBlock * block = NULL, int len = 0); /** Create a new packet to be sent over UDPConnection. Packet has no destination or data. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/P_UDPNet.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index 9c564f3..9cba1b3 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -70,7 +70,6 @@ public: // Interface exported to the outside world void send(UDPPacket * p); - Queue<UDPPacketInternal> reliabilityPktQueue; InkAtomicList atomicQueue; ink_hrtime last_report; ink_hrtime last_service; @@ -81,8 +80,6 @@ public: ~UDPQueue(); }; -#ifdef PACKETQUEUE_IMPL_AS_RING - // 20 ms slots; 2048 slots => 40 sec. into the future #define SLOT_TIME_MSEC 20 #define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC) @@ -296,7 +293,6 @@ private: void kill_cancelled_events() { } }; -#endif void initialize_thread_for_udp_net(EThread * thread); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/P_UDPPacket.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_UDPPacket.h b/iocore/net/P_UDPPacket.h index a93f083..f56e0b9 100644 --- a/iocore/net/P_UDPPacket.h +++ b/iocore/net/P_UDPPacket.h @@ -26,7 +26,6 @@ P_UDPPacket.h Implementation of UDPPacket - ****************************************************************************/ @@ -35,9 +34,6 @@ #include "I_UDPNet.h" -//#define PACKETQUEUE_IMPL_AS_PQLIST -#define PACKETQUEUE_IMPL_AS_RING - class UDPPacketInternal:public UDPPacket { @@ -51,43 +47,32 @@ public: SLINK(UDPPacketInternal, alink); // atomic link // packet scheduling stuff: keep it a doubly linked list - uint64_t pktSendStartTime; - uint64_t pktSendFinishTime; uint64_t pktLength; int reqGenerationNum; ink_hrtime delivery_time; // when to deliver packet - ink_hrtime arrival_time; // when packet arrived Ptr<IOBufferBlock> chain; Continuation *cont; // callback on error UDPConnectionInternal *conn; // connection where packet should be sent to. -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) int in_the_priority_queue; int in_heap; -#endif - - virtual void UDPPacket_is_abstract() { } }; inkcoreapi extern ClassAllocator<UDPPacketInternal> udpPacketAllocator; TS_INLINE UDPPacketInternal::UDPPacketInternal() - : pktSendStartTime(0), pktSendFinishTime(0), pktLength(0), - reqGenerationNum(0), delivery_time(0), arrival_time(0), cont(NULL) , conn(NULL) -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) - ,in_the_priority_queue(0), in_heap(0) -#endif + : pktLength(0), reqGenerationNum(0), delivery_time(0), cont(NULL), + conn(NULL), in_the_priority_queue(0), in_heap(0) { memset(&from, '\0', sizeof(from)); memset(&to, '\0', sizeof(to)); } TS_INLINE -UDPPacketInternal::~ -UDPPacketInternal() +UDPPacketInternal::~UDPPacketInternal() { chain = NULL; } @@ -181,21 +166,13 @@ UDPPacket::getConnection(void) return ((UDPPacketInternal *) this)->conn; } -TS_INLINE void -UDPPacket::setArrivalTime(ink_hrtime t) -{ - ((UDPPacketInternal *) this)->arrival_time = t; -} - TS_INLINE UDPPacket * new_UDPPacket(struct sockaddr const* to, ink_hrtime when, char *buf, int len) { UDPPacketInternal *p = udpPacketAllocator.alloc(); -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) p->in_the_priority_queue = 0; p->in_heap = 0; -#endif p->delivery_time = when; ats_ip_copy(&p->to, to); @@ -217,10 +194,8 @@ new_UDPPacket(struct sockaddr const* to, ink_hrtime when, IOBufferBlock * buf, i UDPPacketInternal *p = udpPacketAllocator.alloc(); IOBufferBlock *body; -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) p->in_the_priority_queue = 0; p->in_heap = 0; -#endif p->delivery_time = when; ats_ip_copy(&p->to, to); @@ -237,10 +212,8 @@ new_UDPPacket(struct sockaddr const* to, ink_hrtime when, Ptr<IOBufferBlock> buf { UDPPacketInternal *p = udpPacketAllocator.alloc(); -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) p->in_the_priority_queue = 0; p->in_heap = 0; -#endif p->delivery_time = when; if (to) ats_ip_copy(&p->to, to); @@ -259,10 +232,8 @@ new_incoming_UDPPacket(struct sockaddr * from, char *buf, int len) { UDPPacketInternal *p = udpPacketAllocator.alloc(); -#if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) p->in_the_priority_queue = 0; p->in_heap = 0; -#endif p->delivery_time = 0; ats_ip_copy(&p->from, from); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/7a57cd36/iocore/net/UnixUDPNet.cc ---------------------------------------------------------------------- diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index a20c93f..fc9c31d 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -136,8 +136,6 @@ UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler * nh, // create packet UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), buf, r); p->setConnection(uc); - // XXX: is this expensive? I] really want to know this information - p->setArrivalTime(ink_get_hrtime_internal()); // queue onto the UDPConnection ink_atomiclist_push(&uc->inQueue, p); iters++; @@ -189,18 +187,21 @@ public: ~UDPReadContinuation(); inline void free(void); inline void init_token(Event * completionToken); + inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen); - inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen); // start up polling void set_timer(int seconds) { timeout_interval = HRTIME_SECONDS(seconds); } + void cancel(); int readPollEvent(int event, Event * e); + Action *getAction() { return event; } + void setupPollDescriptor(); private: @@ -222,8 +223,8 @@ ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator") #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef UDPReadContinuation::UDPReadContinuation(Event * completionToken) -: Continuation(NULL), event(completionToken), readbuf(NULL), - readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0) + : Continuation(NULL), event(completionToken), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1), + ifd(-1), period(0), elapsed_time(0), timeout_interval(0) { if (completionToken->continuation) this->mutex = completionToken->continuation->mutex; @@ -232,10 +233,9 @@ UDPReadContinuation::UDPReadContinuation(Event * completionToken) } UDPReadContinuation::UDPReadContinuation() -: Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL), - readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0) -{ -} + : Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1), + ifd(-1), period(0), elapsed_time(0), timeout_interval(0) +{ } inline void UDPReadContinuation::free(void) @@ -345,40 +345,39 @@ UDPReadContinuation::readPollEvent(int event_, Event * e) //ink_assert(ifd < 0 || event_ == EVENT_INTERVAL || (event_ == EVENT_POLL && pc->pollDescriptor->nfds > ifd && pc->pollDescriptor->pfd[ifd].fd == fd)); //if (ifd < 0 || event_ == EVENT_INTERVAL || (pc->pollDescriptor->pfd[ifd].revents & POLLIN)) { ink_debug_assert(!"incomplete"); + c = completionUtil::getContinuation(event); + // do read + socklen_t tmp_fromlen = *fromaddrlen; + int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen); + + completionUtil::setThread(event, e->ethread); + // call back user with their event + if (rlen > 0) { + // do callback if read is successful + *fromaddrlen = tmp_fromlen; + completionUtil::setInfo(event, fd, readbuf, rlen, errno); + readbuf->fill(rlen); + // TODO: Should we deal with the return code? + c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event); + e->cancel(); + free(); + // delete this; + return EVENT_DONE; + } else if (rlen < 0 && rlen != -EAGAIN) { + // signal error. + *fromaddrlen = tmp_fromlen; + completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno); c = completionUtil::getContinuation(event); - // do read - socklen_t tmp_fromlen = *fromaddrlen; - int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, - 0, // default flags - ats_ip_sa_cast(fromaddr), &tmp_fromlen); - completionUtil::setThread(event, e->ethread); - // call back user with their event - if (rlen > 0) { - // do callback if read is successful - *fromaddrlen = tmp_fromlen; - completionUtil::setInfo(event, fd, readbuf, rlen, errno); - readbuf->fill(rlen); - // TODO: Should we deal with the return code? - c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event); - e->cancel(); - free(); - // delete this; - return EVENT_DONE; - } else if (rlen < 0 && rlen != -EAGAIN) { - // signal error. - *fromaddrlen = tmp_fromlen; - completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno); - c = completionUtil::getContinuation(event); - // TODO: Should we deal with the return code? - c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event); - e->cancel(); - free(); - //delete this; - return EVENT_DONE; - } else { - completionUtil::setThread(event, NULL); - } -//} + // TODO: Should we deal with the return code? + c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event); + e->cancel(); + free(); + //delete this; + return EVENT_DONE; + } else { + completionUtil::setThread(event, NULL); + } + if (event->cancelled) { e->cancel(); free(); @@ -417,10 +416,11 @@ UDPNetProcessor::recvfrom_re(Continuation * cont, ink_assert(buf->write_avail() >= len); int actual; Event *event = completionUtil::create(); + completionUtil::setContinuation(event, cont); completionUtil::setHandle(event, token); - actual = socketManager.recvfrom(fd, buf->end(), len, 0, // default flags - fromaddr, fromaddrlen); + actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen); + if (actual > 0) { completionUtil::setThread(event, this_ethread()); completionUtil::setInfo(event, fd, buf, actual, errno); @@ -458,6 +458,7 @@ UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msg { int actual; Event *event = completionUtil::create(); + completionUtil::setContinuation(event, cont); completionUtil::setHandle(event, token); @@ -489,13 +490,13 @@ UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msg * */ Action * -UDPNetProcessor::sendto_re(Continuation * cont, - void *token, int fd, struct sockaddr const* toaddr, int toaddrlen, IOBufferBlock * buf, int len) +UDPNetProcessor::sendto_re(Continuation * cont, void *token, int fd, struct sockaddr const* toaddr, int toaddrlen, + IOBufferBlock * buf, int len) { (void) token; ink_assert(buf->read_avail() >= len); - int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, - toaddr, toaddrlen); + int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen); + if (nbytes_sent >= 0) { ink_assert(nbytes_sent == len); buf->consume(nbytes_sent); @@ -509,14 +510,9 @@ UDPNetProcessor::sendto_re(Continuation * cont, bool -UDPNetProcessor::CreateUDPSocket( - int *resfd, - sockaddr const* remote_addr, - sockaddr* local_addr, - int *local_addr_len, - Action ** status, - int send_bufsize, int recv_bufsize -) { +UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const* remote_addr, sockaddr* local_addr, + int *local_addr_len, Action ** status, int send_bufsize, int recv_bufsize) +{ int res = 0, fd = -1; ink_assert(ats_ip_are_compatible(remote_addr, local_addr)); @@ -630,15 +626,10 @@ Lerror: // send out all packets that need to be sent out as of time=now UDPQueue::UDPQueue() : last_report(0), last_service(0), packets(0), added(0) -{ -} +{ } UDPQueue::~UDPQueue() { - UDPPacketInternal *p; - - while ((p = reliabilityPktQueue.dequeue()) != NULL) - p->free(); } /* @@ -647,26 +638,18 @@ UDPQueue::~UDPQueue() void UDPQueue::service(UDPNetHandler * nh) { + (void) nh; ink_hrtime now = ink_get_hrtime_internal(); uint64_t timeSpent = 0; + uint64_t pktSendStartTime; UDPPacketInternal *p; ink_hrtime pktSendTime; double minPktSpacing; uint32_t pktSize; int64_t pktLen; - (void) nh; - static ink_hrtime lastPrintTime = ink_get_hrtime_internal(); - static ink_hrtime lastSchedTime = ink_get_hrtime_internal(); - static uint32_t schedJitter = 0; - static uint32_t numTimesSched = 0; - - schedJitter += ink_hrtime_to_msec(now - lastSchedTime); - numTimesSched++; - p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue); if (p) { - UDPPacketInternal *pnext = NULL; Queue<UDPPacketInternal> stk; @@ -676,6 +659,7 @@ UDPQueue::service(UDPNetHandler * nh) stk.push(p); p = pnext; } + // walk backwards down list since this is actually an atomic stack. while (stk.head) { p = stk.pop(); @@ -685,27 +669,20 @@ UDPQueue::service(UDPNetHandler * nh) Debug("udp-send", "Adding %p", p); pktLen = p->getPktLength(); if (p->conn->lastPktStartTime == 0) { - p->pktSendStartTime = MAX(now, p->delivery_time); + pktSendStartTime = MAX(now, p->delivery_time); } else { pktSize = MAX(INK_ETHERNET_MTU_SIZE, pktLen); minPktSpacing = 0.0; pktSendTime = p->delivery_time; - p->pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time); + pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time); } - p->conn->lastPktStartTime = p->pktSendStartTime; - p->delivery_time = p->pktSendStartTime; + p->conn->lastPktStartTime = pktSendStartTime; + p->delivery_time = pktSendStartTime; G_inkPipeInfo.addPacket(p, now); } } - if ((now - lastPrintTime) > ink_hrtime_from_sec(30)) { - Debug("udp-sched-jitter", "avg. udp sched jitter: %f", (double) schedJitter / numTimesSched); - schedJitter = 0; - numTimesSched = 0; - lastPrintTime = now; - } - G_inkPipeInfo.advanceNow(now); SendPackets(); @@ -733,21 +710,6 @@ UDPQueue::SendPackets() if (now > last_service) timeDelta = ink_hrtime_to_msec(now - last_service); - while ((p = reliabilityPktQueue.dequeue()) != NULL) { - pktLen = p->getPktLength(); - if (p->conn->shouldDestroy()) - goto next_pkt_3; - if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) - goto next_pkt_3; - - SendUDPPacket(p, pktLen); - bytesThisSlot -= pktLen; - if (bytesThisSlot < 0) - break; - next_pkt_3: - p->free(); - } - bytesThisSlot = INT_MAX; sendPackets:
