This is an automated email from the ASF dual-hosted git repository. masaori pushed a commit to branch quic-latest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/quic-latest by this push: new c327d2a Transfer huge response using HttpTunnel flow control mechanism c327d2a is described below commit c327d2a40434a1a925a01cbb68de8a43bdfcb597 Author: Masaori Koshiba <masa...@apache.org> AuthorDate: Wed Dec 6 10:13:59 2017 +0900 Transfer huge response using HttpTunnel flow control mechanism --- iocore/net/quic/QUICApplication.cc | 24 +++++++++++++++++++++--- iocore/net/quic/QUICApplication.h | 8 ++------ iocore/net/quic/QUICStream.cc | 16 +++++++++++++++- proxy/hq/HQClientSession.cc | 12 ++++++++++++ proxy/hq/HQClientSession.h | 1 + proxy/hq/HQClientTransaction.cc | 16 ++++++++++++++++ proxy/hq/QUICSimpleApp.cc | 5 ++++- 7 files changed, 71 insertions(+), 11 deletions(-) diff --git a/iocore/net/quic/QUICApplication.cc b/iocore/net/quic/QUICApplication.cc index 5d5a0ed..eca747f 100644 --- a/iocore/net/quic/QUICApplication.cc +++ b/iocore/net/quic/QUICApplication.cc @@ -71,10 +71,22 @@ QUICStreamIO::write(IOBufferReader *r, int64_t alen, int64_t offset) { SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread()); - int64_t bytes_add = this->_write_buffer->write(r, alen, offset); - this->_write_vio->nbytes += bytes_add; + if (this->_write_buffer->write_avail() > 0) { + int64_t bytes_add = this->_write_buffer->write(r, alen, offset); - return bytes_add; + return bytes_add; + + } else { + Debug(tag, "write buffer is full"); + + return 0; + } +} + +void +QUICStreamIO::set_write_vio_nbytes(int64_t nbytes) +{ + this->_write_vio->nbytes += nbytes; } void @@ -101,6 +113,12 @@ QUICStreamIO::shutdown() return this->_stream->shutdown(); } +uint32_t +QUICStreamIO::get_transaction_id() const +{ + return this->_stream->id(); +} + // // QUICApplication // diff --git a/iocore/net/quic/QUICApplication.h b/iocore/net/quic/QUICApplication.h index 8094a4e..17e32e9 100644 --- a/iocore/net/quic/QUICApplication.h +++ b/iocore/net/quic/QUICApplication.h @@ -43,16 +43,12 @@ public: int64_t read(uint8_t *buf, int64_t len); int64_t write(const uint8_t *buf, int64_t len); int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 0); + void set_write_vio_nbytes(int64_t); void read_reenable(); void write_reenable(); IOBufferReader *get_read_buffer_reader(); void shutdown(); - - int - get_transaction_id() const - { - return _stream->id(); - } + uint32_t get_transaction_id() const; private: QUICStream *_stream = nullptr; diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc index 8c9822d..0dc5cf0 100644 --- a/iocore/net/quic/QUICStream.cc +++ b/iocore/net/quic/QUICStream.cc @@ -101,6 +101,10 @@ QUICStream::main_event_handler(int event, void *data) this->_signal_write_event(true); this->_write_event = nullptr; + QUICStreamDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64, + this->_write_vio.nbytes, this->_write_vio.ndone, this->_write_vio.get_reader()->read_avail(), + this->_write_vio.get_writer()->write_avail()); + break; } case VC_EVENT_EOS: @@ -257,6 +261,11 @@ QUICStream::_signal_read_event(bool direct) void QUICStream::_signal_write_event(bool direct) { + if (this->_write_vio.get_writer()->write_avail() == 0) { + QUICStreamDebug("wvio.write_avail=0"); + return; + } + int event = (this->_write_vio.ntodo() == 0) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY; Continuation *cont = this->_write_vio._cont; @@ -334,7 +343,11 @@ QUICStream::recv(const std::shared_ptr<const QUICMaxStreamDataFrame> frame) QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, this->_remote_flow_controller->current_offset(), this->_remote_flow_controller->current_limit()); - this->reenable(&this->_write_vio); + // restart sending + QUICStreamDebug("restart sending"); + + this->_send(); + this->_signal_write_event(false); return QUICErrorUPtr(new QUICNoError()); } @@ -348,6 +361,7 @@ QUICStream::recv(const std::shared_ptr<const QUICStreamBlockedFrame> frame) /** * @brief Send STREAM DATA from _response_buffer + * @detail Call _signal_write_event() to indicate event upper layer */ QUICErrorUPtr QUICStream::_send() diff --git a/proxy/hq/HQClientSession.cc b/proxy/hq/HQClientSession.cc index 91b0035..af4d988 100644 --- a/proxy/hq/HQClientSession.cc +++ b/proxy/hq/HQClientSession.cc @@ -143,3 +143,15 @@ HQClientSession::add_transaction(HQClientTransaction *trans) this->_transaction_list.enqueue(trans); return; } + +// this->_transaction_list should be map? +HQClientTransaction * +HQClientSession::get_transaction(QUICStreamId id) +{ + for (HQClientTransaction *t = this->_transaction_list.head; t; t = static_cast<HQClientTransaction *>(t->link.next)) { + if (t->get_transaction_id() == static_cast<int>(id)) { + return t; + } + } + return nullptr; +} diff --git a/proxy/hq/HQClientSession.h b/proxy/hq/HQClientSession.h index 964be0a..ccbae3d 100644 --- a/proxy/hq/HQClientSession.h +++ b/proxy/hq/HQClientSession.h @@ -54,6 +54,7 @@ public: // HQClientSession specific methods void add_transaction(HQClientTransaction *); + HQClientTransaction *get_transaction(QUICStreamId); private: NetVConnection *_client_vc = nullptr; diff --git a/proxy/hq/HQClientTransaction.cc b/proxy/hq/HQClientTransaction.cc index b30522e..cc3a255 100644 --- a/proxy/hq/HQClientTransaction.cc +++ b/proxy/hq/HQClientTransaction.cc @@ -28,6 +28,10 @@ #include "HQClientSession.h" #include "HttpSM.h" +// XXX this->parent->connection_id() is Session ID of HQClientSession. Should this be QUIC Connection ID? +#define HQTransDebug(fmt, ...) \ + Debug("hq_trans", "[%" PRId64 "] [%" PRIx32 "] " fmt, this->parent->connection_id(), this->get_transaction_id(), ##__VA_ARGS__) + static void dump_io_buffer(IOBufferReader *reader) { @@ -110,6 +114,11 @@ HQClientTransaction::main_event_handler(int event, void *edata) if (this->_write_vio.get_reader()->read_avail()) { this->_write_response(); } + + HQTransDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64, + this->_write_vio.nbytes, this->_write_vio.ndone, this->_write_vio.get_reader()->read_avail(), + this->_write_vio.get_writer()->write_avail()); + break; } default: @@ -241,6 +250,8 @@ static constexpr char http_1_1_version[] = "HTTP/1.1"; void HQClientTransaction::_write_response() { + SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread()); + IOBufferReader *reader = this->_write_vio.get_reader(); if (memcmp(reader->start(), http_1_1_version, sizeof(http_1_1_version) - 1) == 0) { @@ -249,6 +260,8 @@ HQClientTransaction::_write_response() int64_t headers_size = headers->read_avail(); reader->consume(headers_size); this->_write_vio.ndone += headers_size; + // The size of respons to client + this->_stream_io->set_write_vio_nbytes(this->_write_vio.nbytes - headers_size); } // Write HTTP/1.1 response body @@ -256,6 +269,9 @@ HQClientTransaction::_write_response() int64_t total_written = 0; while (total_written < bytes_avail) { int64_t bytes_written = this->_stream_io->write(reader, bytes_avail); + if (bytes_written == 0) { + break; + } reader->consume(bytes_written); this->_write_vio.ndone += bytes_written; total_written += bytes_written; diff --git a/proxy/hq/QUICSimpleApp.cc b/proxy/hq/QUICSimpleApp.cc index f7424d1..a245642 100644 --- a/proxy/hq/QUICSimpleApp.cc +++ b/proxy/hq/QUICSimpleApp.cc @@ -76,7 +76,10 @@ QUICSimpleApp::main_event_handler(int event, Event *data) } case VC_EVENT_WRITE_READY: case VC_EVENT_WRITE_COMPLETE: { - // Nothing to do + HQClientTransaction *trans = this->_client_session->get_transaction(stream->id()); + + trans->handleEvent(event); + break; } case VC_EVENT_EOS: -- To stop receiving notification emails like this one, please contact ['"commits@trafficserver.apache.org" <commits@trafficserver.apache.org>'].