This is an automated email from the ASF dual-hosted git repository. maskit pushed a commit to branch quic-latest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit f0f998cc16f6e4e4368ff8244430e906fc0b81f4 Author: Masakazu Kitajo <mas...@apache.org> AuthorDate: Fri Sep 1 16:45:19 2017 +0900 Reimplement flow control with QUICFlowController --- .gitignore | 1 + iocore/net/P_QUICNetVConnection.h | 18 +- iocore/net/QUICNetVConnection.cc | 83 +++++++- iocore/net/quic/Makefile.am | 1 + iocore/net/quic/Mock.h | 7 +- iocore/net/quic/QUICFlowController.cc | 122 ++++++++++++ iocore/net/quic/QUICFlowController.h | 106 ++++++++++ iocore/net/quic/QUICStream.cc | 168 ++++++---------- iocore/net/quic/QUICStream.h | 29 ++- iocore/net/quic/QUICStreamManager.cc | 99 ++++------ iocore/net/quic/QUICStreamManager.h | 14 +- iocore/net/quic/test/Makefile.am | 35 +++- iocore/net/quic/test/test_QUICFlowController.cc | 252 ++++++++++++++++++++++++ iocore/net/quic/test/test_QUICStream.cc | 9 +- 14 files changed, 731 insertions(+), 213 deletions(-) diff --git a/.gitignore b/.gitignore index a5a618f..e68e278 100644 --- a/.gitignore +++ b/.gitignore @@ -99,6 +99,7 @@ iocore/net/quic/test/test_QUICLossDetector iocore/net/quic/test/test_QUICTypeUtil iocore/net/quic/test/test_QUICAckFrameCreator iocore/net/quic/test/test_QUICVersionNegotiator +iocore/net/quic/test/test_QUICFlowController iocore/net/quic/ts_quic_client iocore/aio/test_AIO iocore/eventsystem/test_Buffer diff --git a/iocore/net/P_QUICNetVConnection.h b/iocore/net/P_QUICNetVConnection.h index f256d78..8ee7cdf 100644 --- a/iocore/net/P_QUICNetVConnection.h +++ b/iocore/net/P_QUICNetVConnection.h @@ -203,13 +203,15 @@ private: // TODO: use custom allocator and make them std::unique_ptr or std::shared_ptr // or make them just member variables. - QUICVersionNegotiator *_version_negotiator = nullptr; - QUICHandshake *_handshake_handler = nullptr; - QUICCrypto *_crypto = nullptr; - QUICLossDetector *_loss_detector = nullptr; - QUICFrameDispatcher *_frame_dispatcher = nullptr; - QUICStreamManager *_stream_manager = nullptr; - QUICCongestionController *_congestion_controller = nullptr; + QUICVersionNegotiator *_version_negotiator = nullptr; + QUICHandshake *_handshake_handler = nullptr; + QUICCrypto *_crypto = nullptr; + QUICLossDetector *_loss_detector = nullptr; + QUICFrameDispatcher *_frame_dispatcher = nullptr; + QUICStreamManager *_stream_manager = nullptr; + QUICCongestionController *_congestion_controller = nullptr; + QUICRemoteFlowController *_remote_flow_controller = nullptr; + QUICLocalFlowController *_local_flow_controller = nullptr; Queue<QUICPacket> _packet_recv_queue; Queue<QUICPacket> _packet_send_queue; @@ -231,6 +233,8 @@ private: Ptr<ProxyMutex> _transmitter_mutex; QUICApplication *_create_application(); + void _init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp, + const std::shared_ptr<const QUICTransportParameters> &remote_tp); }; extern ClassAllocator<QUICNetVConnection> quicNetVCAllocator; diff --git a/iocore/net/QUICNetVConnection.cc b/iocore/net/QUICNetVConnection.cc index 0effe5f..f5a15ba 100644 --- a/iocore/net/QUICNetVConnection.cc +++ b/iocore/net/QUICNetVConnection.cc @@ -104,14 +104,19 @@ QUICNetVConnection::start(SSL_CTX *ssl_ctx) this->_packet_factory.set_crypto_module(this->_crypto); // Create frame handlers - this->_stream_manager = new QUICStreamManager(this, this->_application_map); - this->_congestion_controller = new QUICCongestionController(); - this->_loss_detector = new QUICLossDetector(this); + this->_stream_manager = new QUICStreamManager(this, this->_application_map); + this->_congestion_controller = new QUICCongestionController(); + this->_loss_detector = new QUICLossDetector(this); + this->_remote_flow_controller = new QUICRemoteConnectionFlowController(0, this); + this->_local_flow_controller = new QUICLocalConnectionFlowController(0, this); this->_frame_dispatcher->add_handler(this); this->_frame_dispatcher->add_handler(this->_stream_manager); this->_frame_dispatcher->add_handler(this->_congestion_controller); this->_frame_dispatcher->add_handler(this->_loss_detector); + + this->_init_flow_control_params(this->_handshake_handler->local_transport_parameters(), + this->_handshake_handler->remote_transport_parameters()); } void @@ -255,7 +260,7 @@ QUICNetVConnection::close(QUICError error) std::vector<QUICFrameType> QUICNetVConnection::interests() { - return {QUICFrameType::CONNECTION_CLOSE}; + return {QUICFrameType::CONNECTION_CLOSE, QUICFrameType::BLOCKED, QUICFrameType::MAX_DATA}; } QUICError @@ -264,6 +269,15 @@ QUICNetVConnection::handle_frame(std::shared_ptr<const QUICFrame> frame) QUICError error = QUICError(QUICErrorClass::NONE); switch (frame->type()) { + case QUICFrameType::MAX_DATA: + this->_remote_flow_controller->forward_limit(std::static_pointer_cast<const QUICMaxDataFrame>(frame)->maximum_data()); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [REMOTE] %" PRIu64 "/%" PRIu64, + static_cast<uint64_t>(this->_quic_connection_id), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); + break; + case QUICFrameType::BLOCKED: + // BLOCKED frame is for debugging. Nothing to do here. + break; case QUICFrameType::CONNECTION_CLOSE: DebugQUICCon("Enter state_connection_closed"); SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_connection_closed); @@ -358,8 +372,8 @@ QUICNetVConnection::state_handshake(int event, Event *data) if (this->_handshake_handler && this->_handshake_handler->is_completed()) { this->_application_map->set_default(this->_create_application()); - this->_stream_manager->init_flow_control_params(this->_handshake_handler->local_transport_parameters(), - this->_handshake_handler->remote_transport_parameters()); + this->_init_flow_control_params(this->_handshake_handler->local_transport_parameters(), + this->_handshake_handler->remote_transport_parameters()); DebugQUICCon("Enter state_connection_established"); SET_HANDLER((NetVConnHandler)&QUICNetVConnection::state_connection_established); @@ -507,6 +521,16 @@ QUICNetVConnection::_state_handshake_process_initial_client_packet(std::unique_p if (packet->has_valid_fnv1a_hash()) { bool should_send_ack; error = this->_frame_dispatcher->receive_frames(packet->payload(), packet->payload_size(), should_send_ack); + if (error.cls != QUICErrorClass::NONE) { + return error; + } + error = this->_local_flow_controller->update(this->_stream_manager->total_offset_received()); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [LOCAL] %" PRIu64 "/%" PRIu64, + static_cast<uint64_t>(this->_quic_connection_id), this->_local_flow_controller->current_offset(), + this->_local_flow_controller->current_limit()); + if (error.cls != QUICErrorClass::NONE) { + return error; + } } else { DebugQUICCon("Invalid FNV-1a hash value"); return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::CRYPTOGRAPHIC_ERROR); @@ -586,6 +610,13 @@ QUICNetVConnection::_state_common_send_packet() this->_packetize_frames(); QUICPacket *packet; + QUICError error = this->_remote_flow_controller->update(this->_stream_manager->total_offset_sent()); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [REMOTE] %" PRIu64 "/%" PRIu64, + static_cast<uint64_t>(this->_quic_connection_id), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); + if (error.cls != QUICErrorClass::NONE) { + return error; + } while ((packet = this->_packet_send_queue.dequeue()) != nullptr) { this->_packet_handler->send_packet(*packet, this); this->_loss_detector->on_packet_sent( @@ -654,7 +685,21 @@ QUICError QUICNetVConnection::_recv_and_ack(const uint8_t *payload, uint16_t size, QUICPacketNumber packet_num) { bool should_send_ack; - QUICError error = this->_frame_dispatcher->receive_frames(payload, size, should_send_ack); + + QUICError error; + + error = this->_frame_dispatcher->receive_frames(payload, size, should_send_ack); + if (error.cls != QUICErrorClass::NONE) { + return error; + } + + error = this->_local_flow_controller->update(this->_stream_manager->total_offset_received()); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [LOCAL] %" PRIu64 "/%" PRIu64, static_cast<uint64_t>(this->_quic_connection_id), + this->_local_flow_controller->current_offset(), this->_local_flow_controller->current_limit()); + if (error.cls != QUICErrorClass::NONE) { + return error; + } + // this->_local_flow_controller->forward_limit(); this->_ack_frame_creator.update(packet_num, should_send_ack); std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> ack_frame = this->_ack_frame_creator.create_if_needed(); @@ -709,3 +754,27 @@ QUICNetVConnection::_create_application() return nullptr; } } + +void +QUICNetVConnection::_init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp, + const std::shared_ptr<const QUICTransportParameters> &remote_tp) +{ + this->_stream_manager->init_flow_control_params(local_tp, remote_tp); + + uint32_t local_initial_max_data = 0; + uint32_t remote_initial_max_data = 0; + if (local_tp) { + local_initial_max_data = local_tp->initial_max_data(); + } + if (remote_tp) { + remote_initial_max_data = remote_tp->initial_max_data(); + } + + this->_local_flow_controller->forward_limit(local_initial_max_data); + this->_remote_flow_controller->forward_limit(remote_initial_max_data); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [LOCAL] %" PRIu64 "/%" PRIu64, static_cast<uint64_t>(this->_quic_connection_id), + this->_local_flow_controller->current_offset(), this->_local_flow_controller->current_limit()); + Debug("quic_flow_ctrl", "Connection [%" PRIx64 "] [REMOTE] %" PRIu64 "/%" PRIu64, + static_cast<uint64_t>(this->_quic_connection_id), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); +} diff --git a/iocore/net/quic/Makefile.am b/iocore/net/quic/Makefile.am index fda10cd..40d7638 100644 --- a/iocore/net/quic/Makefile.am +++ b/iocore/net/quic/Makefile.am @@ -49,6 +49,7 @@ libquic_a_SOURCES = \ QUICLossDetector.cc \ QUICStreamManager.cc \ QUICCongestionController.cc \ + QUICFlowController.cc \ QUICStreamState.cc \ QUICStream.cc \ QUICHandshake.cc \ diff --git a/iocore/net/quic/Mock.h b/iocore/net/quic/Mock.h index 54ee271..dd538e2 100644 --- a/iocore/net/quic/Mock.h +++ b/iocore/net/quic/Mock.h @@ -223,16 +223,20 @@ public: class MockQUICFrameTransmitter : public QUICFrameTransmitter { +public: void transmit_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> frame) override { + ++frameCount[static_cast<int>(frame->type())]; } uint32_t maximum_stream_frame_data_size() override { - return 1160; + return 1200; } + + int frameCount[256] = {0}; }; class MockQUICLossDetector : public QUICLossDetector @@ -289,7 +293,6 @@ public: return _totalFrameCount; } - bool is_recv_avail_more_than(uint64_t /* size */) override { return true; } void send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> /* frame */) override { return; } private: diff --git a/iocore/net/quic/QUICFlowController.cc b/iocore/net/quic/QUICFlowController.cc new file mode 100644 index 0000000..ec277e5 --- /dev/null +++ b/iocore/net/quic/QUICFlowController.cc @@ -0,0 +1,122 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QUICFlowController.h" +#include "QUICFrame.h" +#include "QUICFrameTransmitter.h" + +QUICOffset +QUICFlowController::current_offset() +{ + return this->_offset; +} + +QUICOffset +QUICFlowController::current_limit() +{ + return this->_limit; +} + +QUICError +QUICFlowController::update(QUICOffset offset) +{ + if (this->_offset <= offset) { + // Assume flow control is not initialized if the limit was 0 + if (this->_limit != 0 && offset > this->_limit) { + return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); + } + this->_offset = offset; + } + + return QUICError(QUICErrorClass::NONE); +} + +void +QUICFlowController::forward_limit(QUICOffset limit) +{ + // MAX_(STREAM_)DATA might be unorderd due to delay + // Just ignore if the size was smaller than the last one + if (this->_limit > limit) { + return; + } + this->_limit = limit; +} + +void +QUICFlowController::set_threshold(uint64_t threshold) +{ + this->_threshold = threshold; +} + +QUICError +QUICRemoteFlowController::update(QUICOffset offset) +{ + QUICError error = QUICFlowController::update(offset); + + // Assume flow control is not initialized if the limit was 0 + if (this->_limit == 0) { + return error; + } + + // Send BLOCKED(_STREAM) frame + if (offset > this->_limit) { + this->_tx->transmit_frame(this->_create_frame()); + } + + return error; +} + +void +QUICLocalFlowController::forward_limit(QUICOffset offset) +{ + QUICFlowController::forward_limit(offset); + + // Send MAX_(STREAM_)DATA frame + if (this->_limit - this->_offset <= this->_threshold) { + this->_tx->transmit_frame(this->_create_frame()); + } +} + +std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> +QUICRemoteConnectionFlowController::_create_frame() +{ + return QUICFrameFactory::create_blocked_frame(); +} + +std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> +QUICLocalConnectionFlowController::_create_frame() +{ + return QUICFrameFactory::create_max_data_frame(this->_limit); +} + +std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> +QUICRemoteStreamFlowController::_create_frame() +{ + return QUICFrameFactory::create_stream_blocked_frame(this->_stream_id); +} + +std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> +QUICLocalStreamFlowController::_create_frame() +{ + return QUICFrameFactory::create_max_stream_data_frame(this->_stream_id, this->_limit); +} diff --git a/iocore/net/quic/QUICFlowController.h b/iocore/net/quic/QUICFlowController.h new file mode 100644 index 0000000..d9b9eff --- /dev/null +++ b/iocore/net/quic/QUICFlowController.h @@ -0,0 +1,106 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "QUICTypes.h" +#include "QUICFrame.h" + +class QUICFrameTransmitter; + +class QUICFlowController +{ +public: + QUICOffset current_offset(); + QUICOffset current_limit(); + virtual QUICError update(QUICOffset offset); + virtual void forward_limit(QUICOffset limit); + void set_threshold(uint64_t threshold); + +protected: + QUICFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx) : _limit(initial_limit), _tx(tx) {} + virtual std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> _create_frame() = 0; + + QUICOffset _offset = 0; + QUICOffset _limit = 0; + QUICOffset _threshold = 1024; + QUICFrameTransmitter *_tx = nullptr; +}; + +class QUICRemoteFlowController : public QUICFlowController +{ +public: + QUICRemoteFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx) : QUICFlowController(initial_limit, tx) {} + QUICError update(QUICOffset offset) override; +}; + +class QUICLocalFlowController : public QUICFlowController +{ +public: + QUICLocalFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx) : QUICFlowController(initial_limit, tx) {} + void forward_limit(QUICOffset limit) override; +}; + +class QUICRemoteConnectionFlowController : public QUICRemoteFlowController +{ +public: + QUICRemoteConnectionFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx) : QUICRemoteFlowController(initial_limit, tx) + { + } + std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> _create_frame() override; +}; + +class QUICLocalConnectionFlowController : public QUICLocalFlowController +{ +public: + QUICLocalConnectionFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx) : QUICLocalFlowController(initial_limit, tx) + { + } + std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> _create_frame() override; +}; + +class QUICRemoteStreamFlowController : public QUICRemoteFlowController +{ +public: + QUICRemoteStreamFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx, QUICStreamId stream_id) + : QUICRemoteFlowController(initial_limit, tx), _stream_id(stream_id) + { + } + std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> _create_frame() override; + +private: + QUICStreamId _stream_id = 0; +}; + +class QUICLocalStreamFlowController : public QUICLocalFlowController +{ +public: + QUICLocalStreamFlowController(uint64_t initial_limit, QUICFrameTransmitter *tx, QUICStreamId stream_id) + : QUICLocalFlowController(initial_limit, tx), _stream_id(stream_id) + { + } + std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> _create_frame() override; + +private: + QUICStreamId _stream_id = 0; +}; diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc index c7f4615..f03dd8a 100644 --- a/iocore/net/quic/QUICStream.cc +++ b/iocore/net/quic/QUICStream.cc @@ -28,23 +28,21 @@ #include "QUICDebugNames.h" #include "QUICConfig.h" -#define DebugQUICStream(fmt, ...) \ - Debug("quic_stream", "[%" PRIx64 "] [%s] " fmt, static_cast<uint64_t>(this->_id), QUICDebugNames::stream_state(this->_state), \ - ##__VA_ARGS__) - -static constexpr uint64_t MAX_DATA_HEADSPACE = 10240; // in uints of octets -static constexpr uint64_t MAX_STREAM_DATA_HEADSPACE = 1024; +#define DebugQUICStream(fmt, ...) \ + Debug("quic_stream", "[%" PRIx32 "] [%s] " fmt, this->_id, QUICDebugNames::stream_state(this->_state), ##__VA_ARGS__) void QUICStream::init(QUICStreamManager *manager, QUICFrameTransmitter *tx, QUICStreamId id, uint64_t recv_max_stream_data, uint64_t send_max_stream_data) { - this->_stream_manager = manager; - this->_tx = tx; - this->_id = id; - init_flow_control_params(recv_max_stream_data, send_max_stream_data); + this->mutex = new_ProxyMutex(); + this->_stream_manager = manager; + this->_tx = tx; + this->_id = id; + this->_remote_flow_controller = new QUICRemoteStreamFlowController(send_max_stream_data, _tx, _id); + this->_local_flow_controller = new QUICLocalStreamFlowController(recv_max_stream_data, _tx, _id); + this->init_flow_control_params(recv_max_stream_data, send_max_stream_data); - this->mutex = new_ProxyMutex(); DebugQUICStream("Initialized"); } @@ -57,9 +55,15 @@ QUICStream::start() void QUICStream::init_flow_control_params(uint32_t recv_max_stream_data, uint32_t send_max_stream_data) { - this->_recv_max_stream_data = recv_max_stream_data; - this->_recv_max_stream_data_delta = recv_max_stream_data; - this->_send_max_stream_data = send_max_stream_data; + this->_flow_control_buffer_size = recv_max_stream_data; + this->_local_flow_controller->forward_limit(recv_max_stream_data); + this->_remote_flow_controller->forward_limit(send_max_stream_data); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [LOCAL] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_local_flow_controller->current_offset(), + this->_local_flow_controller->current_limit()); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [REMOTE] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); } QUICStreamId @@ -72,6 +76,7 @@ int QUICStream::main_event_handler(int event, void *data) { DebugQUICStream("%s", QUICDebugNames::vc_event(event)); + QUICError error; switch (event) { case VC_EVENT_READ_READY: @@ -83,7 +88,7 @@ QUICStream::main_event_handler(int event, void *data) } case VC_EVENT_WRITE_READY: case VC_EVENT_WRITE_COMPLETE: { - this->_send(); + error = this->_send(); this->_signal_write_event(true); this->_write_event = nullptr; @@ -101,6 +106,12 @@ QUICStream::main_event_handler(int event, void *data) ink_assert(false); } + if (error.cls != QUICErrorClass::NONE) { + // TODO Send error if needed + DebugQUICStream("QUICError: %s (%u), %s (0x%x)", QUICDebugNames::error_class(error.cls), static_cast<unsigned int>(error.cls), + QUICDebugNames::error_code(error.code), static_cast<unsigned int>(error.code)); + } + return EVENT_CONT; } @@ -247,6 +258,10 @@ QUICStream::_write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &fra int bytes_added = this->_read_vio.buffer.writer()->write(frame->data(), frame->data_length()); this->_read_vio.nbytes += bytes_added; this->_recv_offset += frame->data_length(); + this->_local_flow_controller->forward_limit(frame->offset() + this->_flow_control_buffer_size); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [LOCAL] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_local_flow_controller->current_offset(), + this->_local_flow_controller->current_limit()); } void @@ -266,24 +281,28 @@ QUICStream::_reorder_data() * which is called by application via do_io_read() or reenable(). */ QUICError -QUICStream::recv(std::shared_ptr<const QUICStreamFrame> frame) +QUICStream::recv(const std::shared_ptr<const QUICStreamFrame> frame) { ink_assert(_id == frame->stream_id()); ink_assert(this->_read_vio.op == VIO::READ); + // Check stream state - Do this first before accept the frame if (!this->_state.is_allowed_to_receive(*frame)) { this->reset(); return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_INTERNAL_ERROR); } this->_state.update_with_received_frame(*frame); - // Flow Control - QUICError error = this->_recv_flow_control(frame->offset()); + // Flow Control - Even if it's allowed to receive on the state, it may exceed the limit + QUICError error = this->_local_flow_controller->update(frame->offset() + frame->data_length()); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [LOCAL] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_local_flow_controller->current_offset(), + this->_local_flow_controller->current_limit()); if (error.cls != QUICErrorClass::NONE) { return error; } - // Reordering + // Reordering - Some frames may be delayed or be dropped if (this->_recv_offset > frame->offset()) { // Do nothing. Just ignore STREAM frame. return QUICError(QUICErrorClass::NONE); @@ -300,77 +319,31 @@ QUICStream::recv(std::shared_ptr<const QUICStreamFrame> frame) } QUICError -QUICStream::recv(const std::shared_ptr<const QUICMaxStreamDataFrame> &frame) +QUICStream::recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame) { - this->_send_max_stream_data += frame->maximum_stream_data(); + this->_remote_flow_controller->forward_limit(frame->maximum_stream_data()); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [REMOTE] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); return QUICError(QUICErrorClass::NONE); } QUICError -QUICStream::recv(const std::shared_ptr<const QUICStreamBlockedFrame> &frame) -{ - this->_slide_recv_max_stream_data(); - return QUICError(QUICErrorClass::NONE); -} - -void -QUICStream::_slide_recv_max_stream_data() -{ - // TODO: How much should this be increased? - this->_recv_max_stream_data += this->_recv_max_stream_data_delta; - this->_stream_manager->send_frame(QUICFrameFactory::create_max_stream_data_frame(this->_id, this->_recv_max_stream_data)); -} - -QUICError -QUICStream::_recv_flow_control(uint64_t new_offset) +QUICStream::recv(const std::shared_ptr<const QUICStreamBlockedFrame> frame) { - if (this->_recv_largest_offset > new_offset) { - return QUICError(QUICErrorClass::NONE); - } - - uint64_t delta = new_offset - this->_recv_largest_offset; - - Debug("quic_flow_ctrl", "Con: %" PRIu64 "/%" PRIu64 " Stream: %" PRIu64 "/%" PRIu64, - (this->_stream_manager->recv_total_offset() + delta) / 1024, this->_stream_manager->recv_max_data(), new_offset, - this->_recv_max_stream_data); - - // Connection Level Flow Control - if (this->_id != STREAM_ID_FOR_HANDSHAKE) { - if (!this->_stream_manager->is_recv_avail_more_than(delta)) { - return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); - } - - if (!this->_stream_manager->is_recv_avail_more_than(delta + MAX_DATA_HEADSPACE)) { - this->_stream_manager->slide_recv_max_data(); - } - - this->_stream_manager->add_recv_total_offset(delta); - } - - // Stream Level Flow Control - if (this->_recv_max_stream_data > 0) { - if (this->_recv_max_stream_data < new_offset) { - return QUICError(QUICErrorClass::QUIC_TRANSPORT, QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); - } - - if (this->_recv_max_stream_data < new_offset + MAX_STREAM_DATA_HEADSPACE) { - this->_slide_recv_max_stream_data(); - } - } - - this->_recv_largest_offset = new_offset; - + // STREAM_BLOCKED frames are for debugging. Nothing to do here. return QUICError(QUICErrorClass::NONE); } /** * @brief Send STREAM DATA from _response_buffer */ -void +QUICError QUICStream::_send() { SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); + QUICError error; IOBufferReader *reader = this->_write_vio.get_reader(); int64_t bytes_avail = reader->read_avail(); int64_t total_len = 0; @@ -386,7 +359,11 @@ QUICStream::_send() len = data_len; } - if (!this->_send_flow_control(len)) { + QUICError error = this->_remote_flow_controller->update(this->_send_offset + len); + Debug("quic_flow_ctrl", "Stream [%" PRIx32 "] [%s] [REMOTE] %" PRIu64 "/%" PRIu64, this->_id, + QUICDebugNames::stream_state(this->_state), this->_remote_flow_controller->current_offset(), + this->_remote_flow_controller->current_limit()); + if (error.cls != QUICErrorClass::NONE) { break; } @@ -406,34 +383,7 @@ QUICStream::_send() this->_stream_manager->send_frame(std::move(frame)); } - return; -} - -bool -QUICStream::_send_flow_control(uint64_t len) -{ - Debug("quic_flow_ctrl", "Con: %" PRIu64 "/%" PRIu64 " Stream: %" PRIu64 "/%" PRIu64, - (this->_stream_manager->send_total_offset() + len) / 1024, this->_stream_manager->send_max_data(), this->_send_offset + len, - this->_send_max_stream_data); - - // Stream Level Flow Control - // TODO: remove check of _send_max_stream_data when moved to Second Implementation completely - if (this->_send_max_stream_data > 0 && len > this->_send_max_stream_data) { - this->_stream_manager->send_frame(QUICFrameFactory::create_stream_blocked_frame(this->_id)); - - return false; - } - - // Connection Level Flow Control - if (this->_id != STREAM_ID_FOR_HANDSHAKE) { - if (!this->_stream_manager->is_send_avail_more_than(len)) { - this->_stream_manager->send_frame(QUICFrameFactory::create_blocked_frame()); - - return false; - } - } - - return true; + return error; } void @@ -447,3 +397,15 @@ QUICStream::is_read_ready() { return this->_read_vio.nbytes > 0; } + +QUICOffset +QUICStream::largest_offset_received() +{ + return this->_local_flow_controller->current_offset(); +} + +QUICOffset +QUICStream::largest_offset_sent() +{ + return this->_remote_flow_controller->current_offset(); +} diff --git a/iocore/net/quic/QUICStream.h b/iocore/net/quic/QUICStream.h index e3c6d9c..2f28c90 100644 --- a/iocore/net/quic/QUICStream.h +++ b/iocore/net/quic/QUICStream.h @@ -30,6 +30,7 @@ #include "QUICFrame.h" #include "QUICStreamState.h" +#include "QUICFlowController.h" class QUICFrameTransmitter; class QUICStreamState; @@ -59,20 +60,23 @@ public: void do_io_shutdown(ShutdownHowTo_t howto) override; void reenable(VIO *vio) override; - QUICError recv(std::shared_ptr<const QUICStreamFrame> frame); - QUICError recv(const std::shared_ptr<const QUICMaxStreamDataFrame> &frame); - QUICError recv(const std::shared_ptr<const QUICStreamBlockedFrame> &frame); + QUICError recv(const std::shared_ptr<const QUICStreamFrame> frame); + QUICError recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame); + QUICError recv(const std::shared_ptr<const QUICStreamBlockedFrame> frame); void reset(); bool is_read_ready(); + QUICOffset largest_offset_received(); + QUICOffset largest_offset_sent(); + LINK(QUICStream, link); private: QUICStreamState _state; - void _send(); + QUICError _send(); void _write_to_read_vio(const std::shared_ptr<const QUICStreamFrame> &); void _reorder_data(); @@ -85,18 +89,13 @@ private: Event *_send_tracked_event(Event *event, int send_event, VIO *vio); - void _slide_recv_max_stream_data(); - QUICError _recv_flow_control(uint64_t new_offset); - bool _send_flow_control(uint64_t len); - - QUICStreamId _id = 0; - QUICOffset _recv_offset = 0; - QUICOffset _recv_largest_offset = 0; - QUICOffset _send_offset = 0; + QUICStreamId _id = 0; + QUICOffset _recv_offset = 0; + QUICOffset _send_offset = 0; - uint64_t _recv_max_stream_data = 0; - uint64_t _recv_max_stream_data_delta = 0; - uint64_t _send_max_stream_data = 0; + QUICRemoteStreamFlowController *_remote_flow_controller; + QUICLocalStreamFlowController *_local_flow_controller; + uint64_t _flow_control_buffer_size = 1024; VIO _read_vio; VIO _write_vio; diff --git a/iocore/net/quic/QUICStreamManager.cc b/iocore/net/quic/QUICStreamManager.cc index 6b62ebd..035da95 100644 --- a/iocore/net/quic/QUICStreamManager.cc +++ b/iocore/net/quic/QUICStreamManager.cc @@ -39,8 +39,9 @@ QUICStreamManager::QUICStreamManager(QUICFrameTransmitter *tx, QUICApplicationMa std::vector<QUICFrameType> QUICStreamManager::interests() { - return {QUICFrameType::STREAM, QUICFrameType::RST_STREAM, QUICFrameType::MAX_DATA, QUICFrameType::MAX_STREAM_DATA, - QUICFrameType::BLOCKED}; + return { + QUICFrameType::STREAM, QUICFrameType::RST_STREAM, QUICFrameType::MAX_STREAM_DATA, + }; } void @@ -50,13 +51,19 @@ QUICStreamManager::init_flow_control_params(const std::shared_ptr<const QUICTran this->_local_tp = local_tp; this->_remote_tp = remote_tp; - // Connection level - this->_recv_max_data = QUICMaximumData(local_tp->initial_max_data()); - this->_send_max_data = QUICMaximumData(remote_tp->initial_max_data()); - // Setup a stream for Handshake QUICStream *stream = this->_find_stream(STREAM_ID_FOR_HANDSHAKE); - stream->init_flow_control_params(local_tp->initial_max_stream_data(), remote_tp->initial_max_stream_data()); + if (stream) { + uint32_t local_initial_max_stream_data = 0; + uint32_t remote_initial_max_stream_data = 0; + if (this->_local_tp) { + local_initial_max_stream_data = local_tp->initial_max_stream_data(); + } + if (this->_remote_tp) { + remote_initial_max_stream_data = remote_tp->initial_max_stream_data(); + } + stream->init_flow_control_params(local_initial_max_stream_data, remote_initial_max_stream_data); + } } QUICError @@ -65,19 +72,12 @@ QUICStreamManager::handle_frame(std::shared_ptr<const QUICFrame> frame) QUICError error = QUICError(QUICErrorClass::NONE); switch (frame->type()) { - case QUICFrameType::MAX_DATA: { - error = this->_handle_frame(std::dynamic_pointer_cast<const QUICMaxDataFrame>(frame)); - break; - } - case QUICFrameType::BLOCKED: { - this->slide_recv_max_data(); - break; - } case QUICFrameType::MAX_STREAM_DATA: { error = this->_handle_frame(std::dynamic_pointer_cast<const QUICMaxStreamDataFrame>(frame)); break; } case QUICFrameType::STREAM_BLOCKED: { + // STREAM_BLOCKED frame is for debugging. Just propagate to streams error = this->_handle_frame(std::dynamic_pointer_cast<const QUICStreamBlockedFrame>(frame)); break; } @@ -94,21 +94,6 @@ QUICStreamManager::handle_frame(std::shared_ptr<const QUICFrame> frame) } QUICError -QUICStreamManager::_handle_frame(const std::shared_ptr<const QUICMaxDataFrame> &frame) -{ - this->_send_max_data = frame->maximum_data(); - return QUICError(QUICErrorClass::NONE); -} - -void -QUICStreamManager::slide_recv_max_data() -{ - // TODO: How much should this be increased? - this->_recv_max_data += this->_local_tp->initial_max_data(); - this->send_frame(QUICFrameFactory::create_max_data_frame(this->_recv_max_data)); -} - -QUICError QUICStreamManager::_handle_frame(const std::shared_ptr<const QUICMaxStreamDataFrame> &frame) { QUICStream *stream = this->_find_stream(frame->stream_id()); @@ -159,9 +144,7 @@ QUICStreamManager::_handle_frame(const std::shared_ptr<const QUICStreamFrame> &f void QUICStreamManager::send_frame(std::unique_ptr<QUICStreamFrame, QUICFrameDeleterFunc> frame) { - // XXX The offset of sending frame is always largest offset by sending side if (frame->stream_id() != STREAM_ID_FOR_HANDSHAKE) { - this->_send_total_offset += frame->size(); } this->_tx->transmit_frame(std::move(frame)); @@ -179,24 +162,6 @@ QUICStreamManager::send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> f return; } -bool -QUICStreamManager::is_send_avail_more_than(uint64_t size) -{ - return this->_send_max_data > (this->_send_total_offset + size); -} - -bool -QUICStreamManager::is_recv_avail_more_than(uint64_t size) -{ - return this->_recv_max_data > (this->_recv_total_offset + size); -} - -void -QUICStreamManager::add_recv_total_offset(uint64_t delta) -{ - this->_recv_total_offset += delta; -} - QUICStream * QUICStreamManager::_find_stream(QUICStreamId id) { @@ -217,7 +182,7 @@ QUICStreamManager::_find_or_create_stream(QUICStreamId stream_id) stream = THREAD_ALLOC_INIT(quicStreamAllocator, this_ethread()); if (stream_id == STREAM_ID_FOR_HANDSHAKE) { // XXX rece/send max_stream_data are going to be set by init_flow_control_params() - stream->init(this, this->_tx, stream_id); + stream->init(this, this->_tx, stream_id, this->_local_tp->initial_max_stream_data()); } else { const QUICTransportParameters &local_tp = *this->_local_tp; const QUICTransportParameters &remote_tp = *this->_remote_tp; @@ -234,25 +199,29 @@ QUICStreamManager::_find_or_create_stream(QUICStreamId stream_id) } uint64_t -QUICStreamManager::recv_max_data() const +QUICStreamManager::total_offset_received() const { - return this->_recv_max_data; -} + uint64_t total_offset_received = 0; -uint64_t -QUICStreamManager::send_max_data() const -{ - return this->_send_max_data; + // FIXME Iterating all (open + closed) streams is expensive + for (QUICStream *s = this->stream_list.head; s; s = s->link.next) { + if (s->id() != 0) { + total_offset_received += s->largest_offset_received(); + } + } + return total_offset_received; } uint64_t -QUICStreamManager::recv_total_offset() const +QUICStreamManager::total_offset_sent() const { - return this->_recv_total_offset; -} + uint64_t total_offset_sent = 0; -uint64_t -QUICStreamManager::send_total_offset() const -{ - return this->_send_total_offset; + // FIXME Iterating all (open + closed) streams is expensive + for (QUICStream *s = this->stream_list.head; s; s = s->link.next) { + if (s->id() != 0) { + total_offset_sent += s->largest_offset_sent(); + } + } + return this->_total_offset_sent; } diff --git a/iocore/net/quic/QUICStreamManager.h b/iocore/net/quic/QUICStreamManager.h index 88997b5..605496f 100644 --- a/iocore/net/quic/QUICStreamManager.h +++ b/iocore/net/quic/QUICStreamManager.h @@ -41,16 +41,10 @@ public: virtual QUICError handle_frame(std::shared_ptr<const QUICFrame>) override; virtual void send_frame(std::unique_ptr<QUICFrame, QUICFrameDeleterFunc> frame); virtual void send_frame(std::unique_ptr<QUICStreamFrame, QUICFrameDeleterFunc> frame); - virtual bool is_send_avail_more_than(uint64_t size); - virtual bool is_recv_avail_more_than(uint64_t size); - void add_recv_total_offset(uint64_t delta); - void slide_recv_max_data(); void init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp, const std::shared_ptr<const QUICTransportParameters> &remote_tp); - uint64_t recv_max_data() const; - uint64_t send_max_data() const; - uint64_t recv_total_offset() const; - uint64_t send_total_offset() const; + uint64_t total_offset_received() const; + uint64_t total_offset_sent() const; DLL<QUICStream> stream_list; @@ -72,6 +66,6 @@ private: // TODO: Maximum Data is in units of 1024 octets, but those total offset are in units of octets. // Add new uint16_t fields for remainder and treat those total offset in units of 1024 octets if needed - uint64_t _recv_total_offset = 0; - uint64_t _send_total_offset = 0; + uint64_t _total_offset_received = 0; + uint64_t _total_offset_sent = 0; }; diff --git a/iocore/net/quic/test/Makefile.am b/iocore/net/quic/test/Makefile.am index 65f473c..dc6e12f 100644 --- a/iocore/net/quic/test/Makefile.am +++ b/iocore/net/quic/test/Makefile.am @@ -29,7 +29,8 @@ check_PROGRAMS = \ test_QUICLossDetector \ test_QUICTypeUtil \ test_QUICAckFrameCreator \ - test_QUICVersionNegotiator + test_QUICVersionNegotiator \ + test_QUICFlowController AM_CPPFLAGS += \ @@ -152,6 +153,7 @@ test_QUICFrameDispatcher_SOURCES = \ ../QUICStreamManager.cc \ ../QUICApplicationMap.cc \ ../QUICCongestionController.cc \ + ../QUICFlowController.cc \ ../QUICLossDetector.cc \ ../QUICFrame.cc \ ../QUICPacket.cc \ @@ -252,6 +254,7 @@ test_QUICTransportParameters_SOURCES = \ ../QUICStream.cc \ ../QUICStreamState.cc \ ../QUICStreamManager.cc \ + ../QUICFlowController.cc \ ../QUICPacket.cc \ ../QUICFrame.cc \ ../QUICDebugNames.cc \ @@ -397,6 +400,7 @@ test_QUICVersionNegotiator_SOURCES = \ ../QUICStream.cc \ ../QUICStreamState.cc \ ../QUICStreamManager.cc \ + ../QUICFlowController.cc \ ../QUICFrame.cc \ ../QUICDebugNames.cc \ ../QUICVersionNegotiator.cc \ @@ -404,6 +408,35 @@ test_QUICVersionNegotiator_SOURCES = \ ../QUICConfig.cc \ ../../SSLNextProtocolSet.cc +# +# test_QUICFlowController +# +test_QUICFlowController_CPPFLAGS = \ + $(AM_CPPFLAGS) + +test_QUICFlowController_LDFLAGS = \ + @AM_LDFLAGS@ + +test_QUICFlowController_LDADD = \ + $(top_builddir)/lib/ts/libtsutil.la \ + $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/lib/records/librecords_p.a \ + $(top_builddir)/mgmt/libmgmt_p.la \ + $(top_builddir)/lib/ts/libtsutil.la \ + $(top_builddir)/proxy/shared/libUglyLogStubs.a \ + @LIBTCL@ \ + @HWLOC_LIBS@ + +test_QUICFlowController_SOURCES = \ + main.cc \ + test_QUICFlowController.cc \ + ../QUICFlowController.cc \ + ../QUICTypes.cc \ + ../QUICPacket.cc \ + ../QUICCrypto.cc \ + $(QUICCrypto_impl) \ + ../QUICFrame.cc + include $(top_srcdir)/build/tidy.mk tidy-local: $(DIST_SOURCES) diff --git a/iocore/net/quic/test/test_QUICFlowController.cc b/iocore/net/quic/test/test_QUICFlowController.cc new file mode 100644 index 0000000..660cad6 --- /dev/null +++ b/iocore/net/quic/test/test_QUICFlowController.cc @@ -0,0 +1,252 @@ +/** @file + * + * A brief file description + * + * @section license License + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "catch.hpp" + +#include "quic/QUICFlowController.h" +#include "quic/Mock.h" +#include <memory> + +TEST_CASE("QUICFlowController_Local_Connection", "[quic]") +{ + QUICError error; + MockQUICFrameTransmitter tx; + QUICLocalConnectionFlowController fc(1024, &tx); + + // Check initial state + CHECK(fc.current_offset() == 0); + CHECK(fc.current_limit() == 1024); + + error = fc.update(256); + CHECK(fc.current_offset() == 256); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Retransmit + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(1024); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Delay + error = fc.update(512); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Exceed limit + error = fc.update(1280); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.code == QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); + + // MAX_STREAM_DATA + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::MAX_DATA)] == 0); + fc.forward_limit(2048); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 2048); + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::MAX_DATA)] == 1); + + error = fc.update(1280); + CHECK(fc.current_offset() == 1280); + CHECK(fc.current_limit() == 2048); + CHECK(error.cls == QUICErrorClass::NONE); +} + +TEST_CASE("QUICFlowController_Remote_Connection", "[quic]") +{ + QUICError error; + MockQUICFrameTransmitter tx; + QUICRemoteConnectionFlowController fc(1024, &tx); + + // Check initial state + CHECK(fc.current_offset() == 0); + CHECK(fc.current_limit() == 1024); + + error = fc.update(256); + CHECK(fc.current_offset() == 256); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Retransmit + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(1024); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Delay + error = fc.update(512); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Exceed limit + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::BLOCKED)] == 0); + error = fc.update(1280); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls != QUICErrorClass::NONE); + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::BLOCKED)] == 1); + + // MAX_STREAM_DATA + fc.forward_limit(2048); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 2048); + + error = fc.update(1280); + CHECK(fc.current_offset() == 1280); + CHECK(fc.current_limit() == 2048); + CHECK(error.cls == QUICErrorClass::NONE); +} + +TEST_CASE("QUICFlowController_Local_Stream", "[quic]") +{ + QUICError error; + MockQUICFrameTransmitter tx; + QUICLocalStreamFlowController fc(1024, &tx, 0); + + // Check initial state + CHECK(fc.current_offset() == 0); + CHECK(fc.current_limit() == 1024); + + error = fc.update(256); + CHECK(fc.current_offset() == 256); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Retransmit + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(1024); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Delay + error = fc.update(512); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Exceed limit + error = fc.update(1280); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.code == QUICErrorCode::QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); + + // MAX_STREAM_DATA + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::MAX_STREAM_DATA)] == 0); + fc.forward_limit(2048); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 2048); + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::MAX_STREAM_DATA)] == 1); + + error = fc.update(1280); + CHECK(fc.current_offset() == 1280); + CHECK(fc.current_limit() == 2048); + CHECK(error.cls == QUICErrorClass::NONE); +} + +TEST_CASE("QUICFlowController_Remote_Stream", "[quic]") +{ + QUICError error; + MockQUICFrameTransmitter tx; + QUICRemoteStreamFlowController fc(1024, &tx, 0); + + // Check initial state + CHECK(fc.current_offset() == 0); + CHECK(fc.current_limit() == 1024); + + error = fc.update(256); + CHECK(fc.current_offset() == 256); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Retransmit + error = fc.update(512); + CHECK(fc.current_offset() == 512); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + error = fc.update(1024); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Delay + error = fc.update(512); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls == QUICErrorClass::NONE); + + // Exceed limit + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM_BLOCKED)] == 0); + error = fc.update(1280); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 1024); + CHECK(error.cls != QUICErrorClass::NONE); + CHECK(tx.frameCount[static_cast<int>(QUICFrameType::STREAM_BLOCKED)] == 1); + + // MAX_STREAM_DATA + fc.forward_limit(2048); + CHECK(fc.current_offset() == 1024); + CHECK(fc.current_limit() == 2048); + + error = fc.update(1280); + CHECK(fc.current_offset() == 1280); + CHECK(fc.current_limit() == 2048); + CHECK(error.cls == QUICErrorClass::NONE); +} diff --git a/iocore/net/quic/test/test_QUICStream.cc b/iocore/net/quic/test/test_QUICStream.cc index e296621..99563bc 100644 --- a/iocore/net/quic/test/test_QUICStream.cc +++ b/iocore/net/quic/test/test_QUICStream.cc @@ -47,9 +47,10 @@ TEST_CASE("QUICStream_assembling_byte_stream_1", "[quic]") { MIOBuffer *read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); IOBufferReader *reader = read_buffer->alloc_reader(); + MockQUICFrameTransmitter tx; std::unique_ptr<QUICStream> stream(new QUICStream()); - stream->init(manager, nullptr, stream_id, 1024, 1024); + stream->init(manager, &tx, stream_id, 1024, 1024); stream->do_io_read(nullptr, 0, read_buffer); stream->recv(frame_1); @@ -73,9 +74,10 @@ TEST_CASE("QUICStream_assembling_byte_stream_2", "[quic]") { MIOBuffer *read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); IOBufferReader *reader = read_buffer->alloc_reader(); + MockQUICFrameTransmitter tx; std::unique_ptr<QUICStream> stream(new QUICStream()); - stream->init(manager, nullptr, stream_id); + stream->init(manager, &tx, stream_id); stream->do_io_read(nullptr, 0, read_buffer); stream->recv(frame_8); @@ -99,9 +101,10 @@ TEST_CASE("QUICStream_assembling_byte_stream_3", "[quic]") { MIOBuffer *read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); IOBufferReader *reader = read_buffer->alloc_reader(); + MockQUICFrameTransmitter tx; std::unique_ptr<QUICStream> stream(new QUICStream()); - stream->init(manager, nullptr, stream_id); + stream->init(manager, &tx, stream_id); stream->do_io_read(nullptr, 0, read_buffer); stream->recv(frame_8); -- To stop receiving notification emails like this one, please contact "commits@trafficserver.apache.org" <commits@trafficserver.apache.org>.