This is an automated email from the ASF dual-hosted git repository. scw00 pushed a commit to branch quic-latest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit d9a5b22299a8d9dbcc1d7231ae806ab692669166 Author: Oknet Xu <[email protected]> AuthorDate: Sun Jan 28 17:55:16 2018 +0800 Add QUICPollCont --- iocore/net/I_UDPPacket.h | 6 ++ iocore/net/P_QUICNet.h | 60 ++++++++++++++++++ iocore/net/P_QUICNetProcessor.h | 2 + iocore/net/P_QUICNetVConnection.h | 2 + iocore/net/P_UnixNet.h | 24 +++++++- iocore/net/QUICNet.cc | 125 ++++++++++++++++++++++++++++++++++++++ iocore/net/QUICNetVConnection.cc | 50 ++++++++++++++- iocore/net/QUICPacketHandler.cc | 23 +++---- 8 files changed, 274 insertions(+), 18 deletions(-) diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index df899ff..271a250 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -62,6 +62,12 @@ public: IpEndpoint to; // what address to send to int from_size; + typedef union udppacket_data { + void *ptr; + uint32_t u32; + uint64_t u64; + } udppacket_data_t; + udppacket_data_t data; LINK(UDPPacket, link); }; diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h new file mode 100644 index 0000000..fba0e96 --- /dev/null +++ b/iocore/net/P_QUICNet.h @@ -0,0 +1,60 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef __P_QUICNET_H__ +#define __P_QUICNET_H__ + +#include <bitset> + +#include "ts/ink_platform.h" + +#include "P_Net.h" +class NetHandler; +typedef int (NetHandler::*NetContHandler)(int, void *); + +struct QUICPollCont : public Continuation { + NetHandler *net_handler; + PollDescriptor *pollDescriptor; + + QUICPollCont(Ptr<ProxyMutex> &m); + QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh); + ~QUICPollCont(); + int pollEvent(int, Event *); + +public: + // Atomic Queue to save incoming packets + ASLL(UDPPacketInternal, alink) inQueue; + + // Internal Queue to save Long Header Packet + Que(UDPPacket, link) longInQueue; + // Internal Queue to save Short Header Packet + Que(UDPPacket, link) shortInQueue; +}; + +static inline QUICPollCont * +get_QUICPollCont(EThread *t) +{ + return (QUICPollCont *)ETHREAD_GET_PTR(t, quic_NetProcessor.quicPollCont_offset); +} + +#endif diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h index 2972060..2b7eb10 100644 --- a/iocore/net/P_QUICNetProcessor.h +++ b/iocore/net/P_QUICNetProcessor.h @@ -67,6 +67,8 @@ public: Action *main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt) override; + off_t quicPollCont_offset; + private: QUICNetProcessor(const QUICNetProcessor &); QUICNetProcessor &operator=(const QUICNetProcessor &); diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h index a82c37b..4e999d9 100644 --- a/iocore/net/P_QUICNetVConnection.h +++ b/iocore/net/P_QUICNetVConnection.h @@ -302,4 +302,6 @@ private: QUICStatelessResetToken _reset_token; }; +typedef int (QUICNetVConnection::*QUICNetVConnHandler)(int, void *); + extern ClassAllocator<QUICNetVConnection> quicNetVCAllocator; diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 6c69bd3..6b9ad5f 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -575,8 +575,15 @@ EventIO::start(EventLoop l, NetAccept *vc, int events) TS_INLINE int EventIO::start(EventLoop l, UnixNetVConnection *vc, int events) { + int r; type = EVENTIO_READWRITE_VC; - return start(l, vc->con.fd, (Continuation *)vc, events); + r = start(l, vc->con.fd, (Continuation *)vc, events); + if (r < 0 && vc->options.ip_proto == NetVCOptions::USE_UDP) { + // Hack for QUICNetVC + return 0; + } else { + return r; + } } TS_INLINE int EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) @@ -611,6 +618,12 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e) data.c = c; fd = afd; event_loop = l; + // Hack for QUICNetVC: + // quicnetvc->con.fd == NO_FD + // quicnetvc->options.ip_proto == NetVCOptions::USE_UDP + if (afd == NO_FD) { + return -1; + } #if TS_USE_EPOLL struct epoll_event ev; memset(&ev, 0, sizeof(ev)); @@ -643,6 +656,9 @@ EventIO::start(EventLoop l, int afd, Continuation *c, int e) TS_INLINE int EventIO::modify(int e) { + if (fd == NO_FD) { + return 0; + } ink_assert(event_loop); #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) struct epoll_event ev; @@ -722,6 +738,9 @@ EventIO::modify(int e) TS_INLINE int EventIO::refresh(int e) { + if (fd == NO_FD) { + return 0; + } ink_assert(event_loop); #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER) e = e & events; @@ -763,6 +782,9 @@ EventIO::refresh(int e) TS_INLINE int EventIO::stop() { + if (fd == NO_FD) { + return 0; + } if (event_loop) { int retval = 0; #if TS_USE_EPOLL diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc new file mode 100644 index 0000000..c40121f --- /dev/null +++ b/iocore/net/QUICNet.cc @@ -0,0 +1,125 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_Net.h" + +QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m) + : Continuation(m.get()), net_handler(nullptr) +{ + SET_HANDLER(&PollCont::pollEvent); +} + +QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh) + : Continuation(m.get()), net_handler(nh) +{ + SET_HANDLER(&QUICPollCont::pollEvent); +} + +QUICPollCont::~QUICPollCont() +{ +} + +// +// QUICPollCont continuation which traverse the inQueue(ASLL) +// and create new QUICNetVC for Initial Packet, +// and push the triggered QUICNetVC into enable list. +// +int +QUICPollCont::pollEvent(int, Event *) +{ + UnixUDPConnection *uc; + QUICPacketHandler *ph; + QUICNetVConnection *vc; + QUICConnectionId cid; + uint8_t *buf; + uint8_t ptype; + UDPPacket *packet_r; + UDPPacketInternal *p = nullptr; + NetHandler *nh = get_NetHandler(t); + + // Process the ASLL + SList(UDPPacketInternal, alink) aq(inQueue.popall()); + Queue<UDPPacketInternal> result; + while ((p = aq.pop())) { + result.push(p); + } + + while ((p = result.pop())) { + uc = static_cast<UnixUDPConnection *>(p->getConnection()); + ph = static_cast<QUICPacketHandler *>(uc->continuation); + vc = static_cast<QUICNetVConnection *>(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + cid = QUICPacket::connection_id(buf) + if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value. + ptype = buf[0] & 0x7f; + if (ptype == QUICPacketType::INITIAL) { // Initial Packet + vc->read.triggered = 1; + vc->push_packet(p); + // reschedule the vc and callback vc->acceptEvent + this_ethread()->schedule_imm(vc); + } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet + // TODO: + } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet + if (vc) { + vc->read.triggered = 1; + vc->push_packet(p); + } else { + longInQueue.push(p); + } + } else { + ink_assert(!"not reached!"); + } + } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value. + if (vc) { + vc->read.triggered = 1; + vc->push_packet(p); + } else { + shortInQueue.push(p); + } + } else { + ink_assert(!"not reached!"); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } + } + + return EVENT_CONT; +} + +void +initialize_thread_for_quic_net(EThread *thread) +{ + NetHandler *nh = get_NetHandler(thread); + QUICPollCont *quicpc = get_QUICPollCont(thread); + + new ((ink_dummy_for_new *)quicpc) QUICPollCont(thread->mutex, nh); + + thread->schedule_every(quicpc, -9); +} + diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index 3967513..b0088c8 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -63,7 +63,7 @@ void QUICNetVConnection::init(QUICConnectionId original_cid, UDPConnection *udp_con, QUICPacketHandler *packet_handler, QUICConnectionTable *ctable) { - SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); + SET_HANDLER((NetVConnHandler)&QUICNetVConnection::acceptEvent); this->_packet_transmitter_mutex = new_ProxyMutex(); this->_frame_transmitter_mutex = new_ProxyMutex(); this->_udp_con = udp_con; @@ -94,6 +94,51 @@ QUICNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader } int +QUICNetVConnection::acceptEvent(int event, Event *e) +{ + EThread *t = (e == nullptr) ? this_ethread() : e->ethread; + NetHandler *h = get_NetHandler(t); + + MUTEX_TRY_LOCK(lock, h->mutex, t); + if (!lock.is_locked()) { + if (event == EVENT_NONE) { + t->schedule_in(this, HRTIME_MSECONDS(net_retry_delay)); + return EVENT_DONE; + } else { + e->schedule_in(HRTIME_MSECONDS(net_retry_delay)); + return EVENT_CONT; + } + } + + thread = t; + + // Send this NetVC to NetHandler and start to polling read & write event. + if (h->startIO(this) < 0) { + free(t); + return EVENT_DONE; + } + + // Handshake callback handler. + SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); + + // Send this netvc to InactivityCop. + nh->startCop(this); + + if (inactivity_timeout_in) { + set_inactivity_timeout(inactivity_timeout_in); + } else { + set_inactivity_timeout(0); + } + + if (active_timeout_in) { + set_active_timeout(active_timeout_in); + } + + action_.continuation->handleEvent(NET_EVENT_ACCEPT, this); + return EVENT_DONE; +} + +int QUICNetVConnection::startEvent(int /*event ATS_UNUSED */, Event *e) { return EVENT_DONE; @@ -589,8 +634,7 @@ QUICNetVConnection::get_udp_con() void QUICNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) { - ink_assert(false); - + this->handleEvent(QUIC_EVENT_PACKET_READ_READY, nullptr); return; } diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index e60b442..8ac76c0 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -124,6 +124,7 @@ QUICPacketHandlerIn::init_accept(EThread *t = nullptr) void QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) { + EThread *eth; IOBufferBlock *block = udp_packet->getIOBlockChain(); if (is_debug_tag_set("quic_sec")) { @@ -158,6 +159,8 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) return; } + eth = eventProcessor.assign_thread(ET_NET); + // Create a new NetVConnection QUICConnectionId original_cid = this->_read_connection_id(block); QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr)); @@ -165,29 +168,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) vc->id = net_next_connection_number(); vc->con.move(con); vc->submit_time = Thread::get_hrtime(); - vc->mutex = this->mutex; + vc->thread = eth; + vc->mutex = new_ProxyMutex(); vc->action_ = *this->action_; vc->set_is_transparent(this->opt.f_inbound_transparent); vc->set_context(NET_VCONNECTION_IN); - vc->read.triggered = 1; - vc->start(this->_ssl_ctx); vc->options.ip_proto = NetVCOptions::USE_UDP; vc->options.ip_family = udp_packet->from.sa.sa_family; - this->action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc); qc = vc; } - if (qc->is_closed()) { - this->_ctable.erase(qc->connection_id(), qc); - // FIXME QUICNetVConnection is NOT freed to prevent crashes. #2674 - // QUICNetVConnections are going to be freed by QUICNetHandler - // vc->free(vc->thread); - } else { - qc->handle_received_packet(udp_packet); - // FIXME This cast is temporal. It'll be removed when we introduce QUICNetHandler. - eventProcessor.schedule_imm(static_cast<QUICNetVConnection *>(qc), ET_CALL, QUIC_EVENT_PACKET_READ_READY, nullptr); - } + // Push the packet into QUICPollCont + udp_packet->data.ptr = vc; + get_QUICPollCont(eth)->inQueue.push(udp_packet); + } // TODO: Should be called via eventProcessor? -- To stop receiving notification emails like this one, please contact [email protected].
