This is an automated email from the ASF dual-hosted git repository.

masaori pushed a commit to branch quic-latest
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/quic-latest by this push:
     new c327d2a  Transfer huge response using HttpTunnel flow control mechanism
c327d2a is described below

commit c327d2a40434a1a925a01cbb68de8a43bdfcb597
Author: Masaori Koshiba <masa...@apache.org>
AuthorDate: Wed Dec 6 10:13:59 2017 +0900

    Transfer huge response using HttpTunnel flow control mechanism
---
 iocore/net/quic/QUICApplication.cc | 24 +++++++++++++++++++++---
 iocore/net/quic/QUICApplication.h  |  8 ++------
 iocore/net/quic/QUICStream.cc      | 16 +++++++++++++++-
 proxy/hq/HQClientSession.cc        | 12 ++++++++++++
 proxy/hq/HQClientSession.h         |  1 +
 proxy/hq/HQClientTransaction.cc    | 16 ++++++++++++++++
 proxy/hq/QUICSimpleApp.cc          |  5 ++++-
 7 files changed, 71 insertions(+), 11 deletions(-)

diff --git a/iocore/net/quic/QUICApplication.cc 
b/iocore/net/quic/QUICApplication.cc
index 5d5a0ed..eca747f 100644
--- a/iocore/net/quic/QUICApplication.cc
+++ b/iocore/net/quic/QUICApplication.cc
@@ -71,10 +71,22 @@ QUICStreamIO::write(IOBufferReader *r, int64_t alen, 
int64_t offset)
 {
   SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());
 
-  int64_t bytes_add = this->_write_buffer->write(r, alen, offset);
-  this->_write_vio->nbytes += bytes_add;
+  if (this->_write_buffer->write_avail() > 0) {
+    int64_t bytes_add = this->_write_buffer->write(r, alen, offset);
 
-  return bytes_add;
+    return bytes_add;
+
+  } else {
+    Debug(tag, "write buffer is full");
+
+    return 0;
+  }
+}
+
+void
+QUICStreamIO::set_write_vio_nbytes(int64_t nbytes)
+{
+  this->_write_vio->nbytes += nbytes;
 }
 
 void
@@ -101,6 +113,12 @@ QUICStreamIO::shutdown()
   return this->_stream->shutdown();
 }
 
+uint32_t
+QUICStreamIO::get_transaction_id() const
+{
+  return this->_stream->id();
+}
+
 //
 // QUICApplication
 //
diff --git a/iocore/net/quic/QUICApplication.h 
b/iocore/net/quic/QUICApplication.h
index 8094a4e..17e32e9 100644
--- a/iocore/net/quic/QUICApplication.h
+++ b/iocore/net/quic/QUICApplication.h
@@ -43,16 +43,12 @@ public:
   int64_t read(uint8_t *buf, int64_t len);
   int64_t write(const uint8_t *buf, int64_t len);
   int64_t write(IOBufferReader *r, int64_t len = INT64_MAX, int64_t offset = 
0);
+  void set_write_vio_nbytes(int64_t);
   void read_reenable();
   void write_reenable();
   IOBufferReader *get_read_buffer_reader();
   void shutdown();
-
-  int
-  get_transaction_id() const
-  {
-    return _stream->id();
-  }
+  uint32_t get_transaction_id() const;
 
 private:
   QUICStream *_stream = nullptr;
diff --git a/iocore/net/quic/QUICStream.cc b/iocore/net/quic/QUICStream.cc
index 8c9822d..0dc5cf0 100644
--- a/iocore/net/quic/QUICStream.cc
+++ b/iocore/net/quic/QUICStream.cc
@@ -101,6 +101,10 @@ QUICStream::main_event_handler(int event, void *data)
     this->_signal_write_event(true);
     this->_write_event = nullptr;
 
+    QUICStreamDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " 
wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64,
+                    this->_write_vio.nbytes, this->_write_vio.ndone, 
this->_write_vio.get_reader()->read_avail(),
+                    this->_write_vio.get_writer()->write_avail());
+
     break;
   }
   case VC_EVENT_EOS:
@@ -257,6 +261,11 @@ QUICStream::_signal_read_event(bool direct)
 void
 QUICStream::_signal_write_event(bool direct)
 {
+  if (this->_write_vio.get_writer()->write_avail() == 0) {
+    QUICStreamDebug("wvio.write_avail=0");
+    return;
+  }
+
   int event          = (this->_write_vio.ntodo() == 0) ? 
VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
   Continuation *cont = this->_write_vio._cont;
 
@@ -334,7 +343,11 @@ QUICStream::recv(const std::shared_ptr<const 
QUICMaxStreamDataFrame> frame)
   QUICStreamFCDebug("[REMOTE] %" PRIu64 "/%" PRIu64, 
this->_remote_flow_controller->current_offset(),
                     this->_remote_flow_controller->current_limit());
 
-  this->reenable(&this->_write_vio);
+  // restart sending
+  QUICStreamDebug("restart sending");
+
+  this->_send();
+  this->_signal_write_event(false);
 
   return QUICErrorUPtr(new QUICNoError());
 }
@@ -348,6 +361,7 @@ QUICStream::recv(const std::shared_ptr<const 
QUICStreamBlockedFrame> frame)
 
 /**
  * @brief Send STREAM DATA from _response_buffer
+ * @detail Call _signal_write_event() to indicate event upper layer
  */
 QUICErrorUPtr
 QUICStream::_send()
diff --git a/proxy/hq/HQClientSession.cc b/proxy/hq/HQClientSession.cc
index 91b0035..af4d988 100644
--- a/proxy/hq/HQClientSession.cc
+++ b/proxy/hq/HQClientSession.cc
@@ -143,3 +143,15 @@ HQClientSession::add_transaction(HQClientTransaction 
*trans)
   this->_transaction_list.enqueue(trans);
   return;
 }
+
+// this->_transaction_list should be map?
+HQClientTransaction *
+HQClientSession::get_transaction(QUICStreamId id)
+{
+  for (HQClientTransaction *t = this->_transaction_list.head; t; t = 
static_cast<HQClientTransaction *>(t->link.next)) {
+    if (t->get_transaction_id() == static_cast<int>(id)) {
+      return t;
+    }
+  }
+  return nullptr;
+}
diff --git a/proxy/hq/HQClientSession.h b/proxy/hq/HQClientSession.h
index 964be0a..ccbae3d 100644
--- a/proxy/hq/HQClientSession.h
+++ b/proxy/hq/HQClientSession.h
@@ -54,6 +54,7 @@ public:
 
   // HQClientSession specific methods
   void add_transaction(HQClientTransaction *);
+  HQClientTransaction *get_transaction(QUICStreamId);
 
 private:
   NetVConnection *_client_vc = nullptr;
diff --git a/proxy/hq/HQClientTransaction.cc b/proxy/hq/HQClientTransaction.cc
index b30522e..cc3a255 100644
--- a/proxy/hq/HQClientTransaction.cc
+++ b/proxy/hq/HQClientTransaction.cc
@@ -28,6 +28,10 @@
 #include "HQClientSession.h"
 #include "HttpSM.h"
 
+// XXX this->parent->connection_id() is Session ID of HQClientSession. Should 
this be QUIC Connection ID?
+#define HQTransDebug(fmt, ...) \
+  Debug("hq_trans", "[%" PRId64 "] [%" PRIx32 "] " fmt, 
this->parent->connection_id(), this->get_transaction_id(), ##__VA_ARGS__)
+
 static void
 dump_io_buffer(IOBufferReader *reader)
 {
@@ -110,6 +114,11 @@ HQClientTransaction::main_event_handler(int event, void 
*edata)
     if (this->_write_vio.get_reader()->read_avail()) {
       this->_write_response();
     }
+
+    HQTransDebug("wvio.nbytes=%" PRId64 " wvio.ndone=%" PRId64 " 
wvio.read_avail=%" PRId64 " wvio.write_avail=%" PRId64,
+                 this->_write_vio.nbytes, this->_write_vio.ndone, 
this->_write_vio.get_reader()->read_avail(),
+                 this->_write_vio.get_writer()->write_avail());
+
     break;
   }
   default:
@@ -241,6 +250,8 @@ static constexpr char http_1_1_version[] = "HTTP/1.1";
 void
 HQClientTransaction::_write_response()
 {
+  SCOPED_MUTEX_LOCK(lock, this->_write_vio.mutex, this_ethread());
+
   IOBufferReader *reader = this->_write_vio.get_reader();
 
   if (memcmp(reader->start(), http_1_1_version, sizeof(http_1_1_version) - 1) 
== 0) {
@@ -249,6 +260,8 @@ HQClientTransaction::_write_response()
     int64_t headers_size   = headers->read_avail();
     reader->consume(headers_size);
     this->_write_vio.ndone += headers_size;
+    // The size of respons to client
+    this->_stream_io->set_write_vio_nbytes(this->_write_vio.nbytes - 
headers_size);
   }
 
   // Write HTTP/1.1 response body
@@ -256,6 +269,9 @@ HQClientTransaction::_write_response()
   int64_t total_written = 0;
   while (total_written < bytes_avail) {
     int64_t bytes_written = this->_stream_io->write(reader, bytes_avail);
+    if (bytes_written == 0) {
+      break;
+    }
     reader->consume(bytes_written);
     this->_write_vio.ndone += bytes_written;
     total_written += bytes_written;
diff --git a/proxy/hq/QUICSimpleApp.cc b/proxy/hq/QUICSimpleApp.cc
index f7424d1..a245642 100644
--- a/proxy/hq/QUICSimpleApp.cc
+++ b/proxy/hq/QUICSimpleApp.cc
@@ -76,7 +76,10 @@ QUICSimpleApp::main_event_handler(int event, Event *data)
   }
   case VC_EVENT_WRITE_READY:
   case VC_EVENT_WRITE_COMPLETE: {
-    // Nothing to do
+    HQClientTransaction *trans = 
this->_client_session->get_transaction(stream->id());
+
+    trans->handleEvent(event);
+
     break;
   }
   case VC_EVENT_EOS:

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <commits@trafficserver.apache.org>'].

Reply via email to