This is an automated email from the ASF dual-hosted git repository. scw00 pushed a commit to branch quic-latest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit c85f903a11d12b4e1505bd68603f2c157805c4a8 Author: scw00 <[email protected]> AuthorDate: Sun Jan 28 21:22:41 2018 +0800 complete pollcont --- cmd/traffic_quic/quic_client.h | 1 + cmd/traffic_quic/traffic_quic.cc | 2 + iocore/net/I_UDPPacket.h | 2 +- iocore/net/Makefile.am | 2 + iocore/net/P_Net.h | 1 + iocore/net/P_QUICNet.h | 6 ++ iocore/net/P_QUICNetProcessor.h | 1 + iocore/net/P_QUICNetVConnection.h | 3 + iocore/net/QUICNet.cc | 141 +++++++++++++++++++++++--------------- iocore/net/QUICNetProcessor.cc | 10 +++ iocore/net/QUICNetVConnection.cc | 1 + iocore/net/QUICPacketHandler.cc | 14 ++-- proxy/Main.cc | 4 ++ 13 files changed, 129 insertions(+), 59 deletions(-) diff --git a/cmd/traffic_quic/quic_client.h b/cmd/traffic_quic/quic_client.h index 05d0c47..575d7a1 100644 --- a/cmd/traffic_quic/quic_client.h +++ b/cmd/traffic_quic/quic_client.h @@ -23,6 +23,7 @@ #pragma once +#include "P_Net.h" #include "I_EventSystem.h" #include "I_NetVConnection.h" #include "P_QUICNetProcessor.h" diff --git a/cmd/traffic_quic/traffic_quic.cc b/cmd/traffic_quic/traffic_quic.cc index 5e48349..19ea9c2 100644 --- a/cmd/traffic_quic/traffic_quic.cc +++ b/cmd/traffic_quic/traffic_quic.cc @@ -78,6 +78,8 @@ main(int argc, const char **argv) SSLInitializeLibrary(); SSLConfig::startup(); + quic_NetProcessor.init(); + ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION); eventProcessor.start(THREADS); udpNet.start(1, stacksize); diff --git a/iocore/net/I_UDPPacket.h b/iocore/net/I_UDPPacket.h index 271a250..fd047e5 100644 --- a/iocore/net/I_UDPPacket.h +++ b/iocore/net/I_UDPPacket.h @@ -63,7 +63,7 @@ public: int from_size; typedef union udppacket_data { - void *ptr; + void *ptr; uint32_t u32; uint64_t u64; } udppacket_data_t; diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index b044e80..f32cc13 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -162,10 +162,12 @@ libinknet_a_SOURCES = \ if ENABLE_QUIC libinknet_a_SOURCES += \ P_QUICPacketHandler.h \ + P_QUICNet.h \ P_QUICNetProcessor.h \ P_QUICNetVConnection.h \ P_QUICNextProtocolAccept.h \ QUICPacketHandler.cc \ + QUICNet.cc \ QUICNetProcessor.cc \ QUICNetVConnection.cc \ QUICNextProtocolAccept.cc diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h index 0a7a502..38504ee 100644 --- a/iocore/net/P_Net.h +++ b/iocore/net/P_Net.h @@ -113,6 +113,7 @@ extern RecRawStatBlock *net_rsb; #include "P_QUICNetVConnection.h" #include "P_QUICNetProcessor.h" #include "P_QUICPacketHandler.h" +#include "P_QUICNet.h" #endif // #include "P_QUICCertLookup.h" diff --git a/iocore/net/P_QUICNet.h b/iocore/net/P_QUICNet.h index fba0e96..7231207 100644 --- a/iocore/net/P_QUICNet.h +++ b/iocore/net/P_QUICNet.h @@ -32,6 +32,8 @@ class NetHandler; typedef int (NetHandler::*NetContHandler)(int, void *); +void initialize_thread_for_quic_net(EThread *thread); + struct QUICPollCont : public Continuation { NetHandler *net_handler; PollDescriptor *pollDescriptor; @@ -49,6 +51,10 @@ public: Que(UDPPacket, link) longInQueue; // Internal Queue to save Short Header Packet Que(UDPPacket, link) shortInQueue; + +private: + void _process_short_header_packet(UDPPacketInternal *p, NetHandler *nh); + void _process_long_header_packet(UDPPacketInternal *p, NetHandler *nh); }; static inline QUICPollCont * diff --git a/iocore/net/P_QUICNetProcessor.h b/iocore/net/P_QUICNetProcessor.h index 2b7eb10..c9d6c2a 100644 --- a/iocore/net/P_QUICNetProcessor.h +++ b/iocore/net/P_QUICNetProcessor.h @@ -56,6 +56,7 @@ public: QUICNetProcessor(); virtual ~QUICNetProcessor(); + void init() override; virtual int start(int, size_t stacksize) override; void cleanup(); // TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h index 4e999d9..8f4d7e5 100644 --- a/iocore/net/P_QUICNetVConnection.h +++ b/iocore/net/P_QUICNetVConnection.h @@ -145,6 +145,9 @@ public: QUICNetVConnection() {} void init(QUICConnectionId original_cid, UDPConnection *, QUICPacketHandler *, QUICConnectionTable *ctable = nullptr); + // accept new conn_id + int acceptEvent(int event, Event *e); + // UnixNetVConnection void reenable(VIO *vio) override; VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override; diff --git a/iocore/net/QUICNet.cc b/iocore/net/QUICNet.cc index c40121f..3e730af 100644 --- a/iocore/net/QUICNet.cc +++ b/iocore/net/QUICNet.cc @@ -23,14 +23,12 @@ #include "P_Net.h" -QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m) - : Continuation(m.get()), net_handler(nullptr) +QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m) : Continuation(m.get()), net_handler(nullptr) { - SET_HANDLER(&PollCont::pollEvent); + SET_HANDLER(&QUICPollCont::pollEvent); } -QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh) - : Continuation(m.get()), net_handler(nh) +QUICPollCont::QUICPollCont(Ptr<ProxyMutex> &m, NetHandler *nh) : Continuation(m.get()), net_handler(nh) { SET_HANDLER(&QUICPollCont::pollEvent); } @@ -39,6 +37,83 @@ QUICPollCont::~QUICPollCont() { } +void +QUICPollCont::_process_long_header_packet(UDPPacketInternal *p, NetHandler *nh) +{ + QUICNetVConnection *vc; + QUICPacketType ptype; + uint8_t *buf; + + // FIXME: VC is nullptr ? + vc = static_cast<QUICNetVConnection *>(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) { + // TODO: Some packets may not have connection id + p->free(); + return; + } + + ptype = static_cast<QUICPacketType>(buf[0] & 0x7f); + switch (ptype) { + case QUICPacketType::INITIAL: + vc->read.triggered = 1; + vc->handle_received_packet(p); + this->mutex->thread_holding->schedule_imm(vc); + return; + case QUICPacketType::ZERO_RTT_PROTECTED: + // TODO:: do something ? + // break; + case QUICPacketType::HANDSHAKE: + default: + // Just Pass Through + if (vc) { + vc->read.triggered = 1; + vc->handle_received_packet(p); + } else { + longInQueue.push(p); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } + break; + } +} + +void +QUICPollCont::_process_short_header_packet(UDPPacketInternal *p, NetHandler *nh) +{ + QUICNetVConnection *vc; + uint8_t *buf; + + vc = static_cast<QUICNetVConnection *>(p->data.ptr); + buf = (uint8_t *)p->getIOBlockChain()->buf(); + if (!QUICTypeUtil::has_connection_id(reinterpret_cast<const uint8_t *>(buf))) { + // TODO: Some packets may not have connection id + p->free(); + return; + } + + if (vc) { + vc->read.triggered = 1; + vc->handle_received_packet(p); + } else { + shortInQueue.push(p); + } + + // Push QUICNetVC into nethandler's enabled list + if (vc != nullptr) { + int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); + if (!isin) { + nh->read_enable_list.push(vc); + } + } +} + // // QUICPollCont continuation which traverse the inQueue(ASLL) // and create new QUICNetVC for Initial Packet, @@ -47,15 +122,10 @@ QUICPollCont::~QUICPollCont() int QUICPollCont::pollEvent(int, Event *) { - UnixUDPConnection *uc; - QUICPacketHandler *ph; - QUICNetVConnection *vc; - QUICConnectionId cid; + ink_assert(this->mutex->thread_holding == this_thread()); uint8_t *buf; - uint8_t ptype; - UDPPacket *packet_r; UDPPacketInternal *p = nullptr; - NetHandler *nh = get_NetHandler(t); + NetHandler *nh = get_NetHandler(this->mutex->thread_holding); // Process the ASLL SList(UDPPacketInternal, alink) aq(inQueue.popall()); @@ -65,50 +135,14 @@ QUICPollCont::pollEvent(int, Event *) } while ((p = result.pop())) { - uc = static_cast<UnixUDPConnection *>(p->getConnection()); - ph = static_cast<QUICPacketHandler *>(uc->continuation); - vc = static_cast<QUICNetVConnection *>(p->data.ptr); buf = (uint8_t *)p->getIOBlockChain()->buf(); - cid = QUICPacket::connection_id(buf) - if (buf[0] & 0x80) { // Long Header Packet with Connection ID, has a valid type value. - ptype = buf[0] & 0x7f; - if (ptype == QUICPacketType::INITIAL) { // Initial Packet - vc->read.triggered = 1; - vc->push_packet(p); - // reschedule the vc and callback vc->acceptEvent - this_ethread()->schedule_imm(vc); - } elseif (ptype == QUICPacketType::ZERO_RTT_PROTECTED) { // 0-RTT Packet - // TODO: - } elseif (ptype == QUICPacketType::HANDSHAKE) { // Handshake Packet - if (vc) { - vc->read.triggered = 1; - vc->push_packet(p); - } else { - longInQueue.push(p); - } - } else { - ink_assert(!"not reached!"); - } - } elseif (buf[0] & 0x40) { // Short Header Packet with Connection ID, has a valid type value. - if (vc) { - vc->read.triggered = 1; - vc->push_packet(p); - } else { - shortInQueue.push(p); - } - } else { - ink_assert(!"not reached!"); - } - - // Push QUICNetVC into nethandler's enabled list - if (vc != nullptr) { - int isin = ink_atomic_swap(&vc->read.in_enabled_list, 1); - if (!isin) { - nh->read_enable_list.push(vc); - } + if (QUICTypeUtil::has_long_header(buf)) { // Long Header Packet with Connection ID, has a valid type value. + this->_process_long_header_packet(p, nh); + } else { // Short Header Packet with Connection ID, has a valid type value. + this->_process_short_header_packet(p, nh); } } - + return EVENT_CONT; } @@ -122,4 +156,3 @@ initialize_thread_for_quic_net(EThread *thread) thread->schedule_every(quicpc, -9); } - diff --git a/iocore/net/QUICNetProcessor.cc b/iocore/net/QUICNetProcessor.cc index 8dc335f..da372c6 100644 --- a/iocore/net/QUICNetProcessor.cc +++ b/iocore/net/QUICNetProcessor.cc @@ -50,6 +50,16 @@ QUICNetProcessor::cleanup() SSL_CTX_free(this->_ssl_ctx); } +void +QUICNetProcessor::init() +{ + // first we allocate a QUICPollCont. + this->quicPollCont_offset = eventProcessor.allocate(sizeof(QUICPollCont)); + + // schedule event + eventProcessor.schedule_spawn(&initialize_thread_for_quic_net, ET_NET); +} + int QUICNetProcessor::start(int, size_t stacksize) { diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index b0088c8..e71c109 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -117,6 +117,7 @@ QUICNetVConnection::acceptEvent(int event, Event *e) free(t); return EVENT_DONE; } + this->read.enabled = 1; // Handshake callback handler. SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_pre_handshake); diff --git a/iocore/net/QUICPacketHandler.cc b/iocore/net/QUICPacketHandler.cc index 8ac76c0..a09c394 100644 --- a/iocore/net/QUICPacketHandler.cc +++ b/iocore/net/QUICPacketHandler.cc @@ -125,7 +125,8 @@ void QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) { EThread *eth; - IOBufferBlock *block = udp_packet->getIOBlockChain(); + QUICNetVConnection *vc = nullptr; + IOBufferBlock *block = udp_packet->getIOBlockChain(); if (is_debug_tag_set("quic_sec")) { ip_port_text_buffer ipb; @@ -163,7 +164,7 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) // Create a new NetVConnection QUICConnectionId original_cid = this->_read_connection_id(block); - QUICNetVConnection *vc = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr)); + vc = static_cast<QUICNetVConnection *>(getNetProcessor()->allocate_vc(nullptr)); vc->init(original_cid, udp_packet->getConnection(), this, &this->_ctable); vc->id = net_next_connection_number(); vc->con.move(con); @@ -173,16 +174,21 @@ QUICPacketHandlerIn::_recv_packet(int event, UDPPacket *udp_packet) vc->action_ = *this->action_; vc->set_is_transparent(this->opt.f_inbound_transparent); vc->set_context(NET_VCONNECTION_IN); + vc->read.triggered = 1; + vc->start(this->_ssl_ctx); vc->options.ip_proto = NetVCOptions::USE_UDP; vc->options.ip_family = udp_packet->from.sa.sa_family; qc = vc; + } else { + vc = static_cast<QUICNetVConnection *>(qc); + eth = vc->thread; } // Push the packet into QUICPollCont udp_packet->data.ptr = vc; - get_QUICPollCont(eth)->inQueue.push(udp_packet); - + // should we use dynamic_cast ?? + get_QUICPollCont(eth)->inQueue.push(static_cast<UDPPacketInternal *>(udp_packet)); } // TODO: Should be called via eventProcessor? diff --git a/proxy/Main.cc b/proxy/Main.cc index 5e36c0e..5614444 100644 --- a/proxy/Main.cc +++ b/proxy/Main.cc @@ -1800,6 +1800,10 @@ main(int /* argc ATS_UNUSED */, const char **argv) // Do the inits for NetProcessors that use ET_NET threads. MUST be before starting those threads. netProcessor.init(); init_HttpProxyServer(); +#if TS_USE_QUIC == 1 + // OK, pushing a spawn scheduling here + quic_NetProcessor.init(); +#endif // !! ET_NET threads start here !! // This means any spawn scheduling must be done before this point. -- To stop receiving notification emails like this one, please contact [email protected].
