This is an automated email from the ASF dual-hosted git repository.
masaori pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new c55001b Avoid unnecesarry copy on POST request over HTTP/2
c55001b is described below
commit c55001bec19d4db63f58b484186b942fbec5ae2a
Author: Masaori Koshiba <[email protected]>
AuthorDate: Thu Sep 26 15:46:05 2019 +0900
Avoid unnecesarry copy on POST request over HTTP/2
---
proxy/http2/Http2ConnectionState.cc | 19 +++++--
proxy/http2/Http2Stream.cc | 105 +++++++++++++++++++-----------------
proxy/http2/Http2Stream.h | 14 ++++-
3 files changed, 83 insertions(+), 55 deletions(-)
diff --git a/proxy/http2/Http2ConnectionState.cc
b/proxy/http2/Http2ConnectionState.cc
index 3e40f18..eb23143 100644
--- a/proxy/http2/Http2ConnectionState.cc
+++ b/proxy/http2/Http2ConnectionState.cc
@@ -160,6 +160,11 @@ rcv_data_frame(Http2ConnectionState &cstate, const
Http2Frame &frame)
stream->decrement_server_rwnd(payload_length);
const uint32_t unpadded_length = payload_length - pad_length;
+ MIOBuffer *writer = stream->read_vio_writer();
+ if (writer == nullptr) {
+ return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_STREAM,
Http2ErrorCode::HTTP2_ERROR_INTERNAL_ERROR);
+ }
+
// If we call write() multiple times, we must keep the same reader, so we can
// update its offset via consume. Otherwise, we will read the same data on
the
// second time through
@@ -168,18 +173,24 @@ rcv_data_frame(Http2ConnectionState &cstate, const
Http2Frame &frame)
if (frame.header().flags & HTTP2_FLAGS_DATA_PADDED) {
myreader->consume(HTTP2_DATA_PADLEN_LEN);
}
- while (nbytes < payload_length - pad_length) {
+
+ while (nbytes < unpadded_length) {
size_t read_len =
BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA]);
if (nbytes + read_len > unpadded_length) {
read_len -= nbytes + read_len - unpadded_length;
}
- nbytes += stream->request_buffer.write(myreader, read_len);
+ nbytes += writer->write(myreader, read_len);
myreader->consume(nbytes);
- // If there is an outstanding read, update the buffer
- stream->update_read_request(INT64_MAX, true);
}
myreader->writer()->dealloc_reader(myreader);
+ if (frame.header().flags & HTTP2_FLAGS_DATA_END_STREAM) {
+ // TODO: set total written size to read_vio.nbytes
+ stream->signal_read_event(VC_EVENT_READ_COMPLETE);
+ } else {
+ stream->signal_read_event(VC_EVENT_READ_READY);
+ }
+
uint32_t initial_rwnd =
cstate.server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
uint32_t min_rwnd = std::min(initial_rwnd,
cstate.server_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE));
// Connection level WINDOW UPDATE
diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc
index 8da3d86..ecbec35 100644
--- a/proxy/http2/Http2Stream.cc
+++ b/proxy/http2/Http2Stream.cc
@@ -52,7 +52,7 @@ Http2Stream::init(Http2StreamId sid, ssize_t initial_rwnd)
this->_thread = this_ethread();
this->_client_rwnd = initial_rwnd;
- _reader = request_reader = request_buffer.alloc_reader();
+ this->_reader = this->_request_buffer.alloc_reader();
// FIXME: Are you sure? every "stream" needs request_header?
_req_header.create(HTTP_TYPE_REQUEST);
response_header.create(HTTP_TYPE_RESPONSE);
@@ -195,21 +195,30 @@ Http2Stream::send_request(Http2ConnectionState &cstate)
do {
bufindex = 0;
tmp = dumpoffset;
- IOBufferBlock *block = request_buffer.get_current_block();
+ IOBufferBlock *block = this->_request_buffer.get_current_block();
if (!block) {
- request_buffer.add_block();
- block = request_buffer.get_current_block();
+ this->_request_buffer.add_block();
+ block = this->_request_buffer.get_current_block();
}
done = _req_header.print(block->start(), block->write_avail(), &bufindex,
&tmp);
dumpoffset += bufindex;
- request_buffer.fill(bufindex);
+ this->_request_buffer.fill(bufindex);
if (!done) {
- request_buffer.add_block();
+ this->_request_buffer.add_block();
}
} while (!done);
- // Is there a read_vio request waiting?
- this->update_read_request(INT64_MAX, true);
+ if (bufindex == 0) {
+ // No data to signal read event
+ return;
+ }
+
+ if (this->recv_end_stream) {
+ this->read_vio.nbytes = bufindex;
+ this->signal_read_event(VC_EVENT_READ_COMPLETE);
+ } else {
+ this->signal_read_event(VC_EVENT_READ_READY);
+ }
}
bool
@@ -330,9 +339,7 @@ Http2Stream::do_io_read(Continuation *c, int64_t nbytes,
MIOBuffer *buf)
read_vio.vc_server = this;
read_vio.op = VIO::READ;
- // Is there already data in the request_buffer? If so, copy it over and then
- // schedule a READ_READY or READ_COMPLETE event after we return.
- update_read_request(nbytes, false, true);
+ // TODO: re-enable read_vio
return &read_vio;
}
@@ -523,44 +530,26 @@ Http2Stream::update_read_request(int64_t read_len, bool
call_update, bool check_
ink_release_assert(this->_thread == this_ethread());
SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
- if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) {
- // If this vio has a different buffer, we must copy
- ink_release_assert(this_ethread() == this->_thread);
- if (read_vio.buffer.writer() != (&request_buffer)) {
- int64_t num_to_read = read_vio.nbytes - read_vio.ndone;
- if (num_to_read > read_len) {
- num_to_read = read_len;
- }
- if (num_to_read > 0) {
- int bytes_added = read_vio.buffer.writer()->write(request_reader,
num_to_read);
- if (bytes_added > 0 || (check_eos && recv_end_stream)) {
- request_reader->consume(bytes_added);
- read_vio.ndone += bytes_added;
- int send_event = (read_vio.nbytes == read_vio.ndone ||
recv_end_stream) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
- if (call_update) { // Safe to call vio handler directly
- inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
- if (read_vio.cont && this->_sm) {
- read_vio.cont->handleEvent(send_event, &read_vio);
- }
- } else { // Called from do_io_read. Still setting things up. Send
event to handle this after the dust settles
- read_event = send_tracked_event(read_event, send_event, &read_vio);
- }
- }
- }
- } else {
- // Try to be smart and only signal if there was additional data
- int send_event = (read_vio.nbytes == read_vio.ndone) ?
VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
- if (request_reader->read_avail() > 0 || send_event ==
VC_EVENT_READ_COMPLETE) {
- if (call_update) { // Safe to call vio handler directly
- inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
- if (read_vio.cont && this->_sm) {
- read_vio.cont->handleEvent(send_event, &read_vio);
- }
- } else { // Called from do_io_read. Still setting things up. Send
event
- // to handle this after the dust settles
- read_event = send_tracked_event(read_event, send_event, &read_vio);
- }
+ if (read_vio.nbytes == 0) {
+ return;
+ }
+
+ // Try to be smart and only signal if there was additional data
+ int send_event = VC_EVENT_READ_READY;
+ if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes
!= INT64_MAX)) {
+ send_event = VC_EVENT_READ_COMPLETE;
+ }
+
+ int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail();
+ if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) {
+ if (call_update) { // Safe to call vio handler directly
+ inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
+ if (read_vio.cont && this->_sm) {
+ read_vio.cont->handleEvent(send_event, &read_vio);
}
+ } else { // Called from do_io_read. Still setting things up. Send event
+ // to handle this after the dust settles
+ read_event = send_tracked_event(read_event, send_event, &read_vio);
}
}
}
@@ -671,6 +660,24 @@ Http2Stream::update_write_request(IOBufferReader
*buf_reader, int64_t write_len,
}
void
+Http2Stream::signal_read_event(int event)
+{
+ if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr
|| this->read_vio.op == VIO::NONE) {
+ return;
+ }
+
+ MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread());
+ if (lock.is_locked()) {
+ this->read_vio.cont->handleEvent(event, &this->read_vio);
+ } else {
+ if (this->_read_vio_event) {
+ this->_read_vio_event->cancel();
+ }
+ this->_read_vio_event = this_ethread()->schedule_imm(read_vio.cont, event,
&read_vio);
+ }
+}
+
+void
Http2Stream::signal_write_event(bool call_update)
{
if (this->write_vio.cont == nullptr || this->write_vio.op == VIO::NONE) {
@@ -794,7 +801,7 @@ Http2Stream::destroy()
response_header.destroy();
// Drop references to all buffer data
- request_buffer.clear();
+ this->_request_buffer.clear();
// Free the mutexes in the VIO
read_vio.mutex.clear();
diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h
index 87ea36b..cc8bffc 100644
--- a/proxy/http2/Http2Stream.h
+++ b/proxy/http2/Http2Stream.h
@@ -73,7 +73,10 @@ public:
void terminate_if_possible();
void update_read_request(int64_t read_len, bool send_update, bool check_eos
= false);
void update_write_request(IOBufferReader *buf_reader, int64_t write_len,
bool send_update);
+
+ void signal_read_event(int event);
void signal_write_event(bool call_update);
+
void restart_sending();
bool push_promise(URL &url, const MIMEField *accept_encoding);
@@ -120,6 +123,7 @@ public:
void update_initial_rwnd(Http2WindowSize new_size);
bool has_trailing_header() const;
void set_request_headers(HTTPHdr &h2_headers);
+ MIOBuffer *read_vio_writer() const;
//////////////////
// Variables
@@ -138,8 +142,6 @@ public:
HTTPHdr response_header;
IOBufferReader *response_reader = nullptr;
- IOBufferReader *request_reader = nullptr;
- MIOBuffer request_buffer =
CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
Http2DependencyTree::Node *priority_node = nullptr;
private:
@@ -161,6 +163,8 @@ private:
int64_t _http_sm_id = -1;
HTTPHdr _req_header;
+ MIOBuffer _request_buffer = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
+ int64_t read_vio_nbytes;
VIO read_vio;
VIO write_vio;
@@ -316,3 +320,9 @@ Http2Stream::is_first_transaction() const
{
return is_first_transaction_flag;
}
+
+inline MIOBuffer *
+Http2Stream::read_vio_writer() const
+{
+ return this->read_vio.get_writer();
+}