This is an automated email from the ASF dual-hosted git repository.

dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e131d6a38 traffic_ctl: Remove the hardcoded big buffer and use a 
stream buffer (#10870)
3e131d6a38 is described below

commit 3e131d6a38b578bbbb44e0b84d24ff70d276ef55
Author: Damian Meden <dme...@apache.org>
AuthorDate: Fri Dec 1 12:21:47 2023 +0100

    traffic_ctl: Remove the hardcoded big buffer and use a stream buffer 
(#10870)
    
    instead.
    On small responses this will just use the fixed buffer, if response
    contains more than the fixed buffer then it will save the data into a
    stringstream and reuse the already allocated buffer.
---
 include/shared/rpc/IPCSocketClient.h             |  4 +-
 include/shared/rpc/RPCClient.h                   | 23 +++---
 src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 14 ++--
 src/shared/rpc/IPCSocketClient.cc                | 98 ++++++++++++++++++++----
 4 files changed, 106 insertions(+), 33 deletions(-)

diff --git a/include/shared/rpc/IPCSocketClient.h 
b/include/shared/rpc/IPCSocketClient.h
index d738734a2a..73b49b499c 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -55,8 +55,8 @@ struct IPCSocketClient {
   /// Send all the passed string to the socket.
   self_reference send(std::string_view data);
 
-  /// Read all the content from the socket till the passed buffer is full.
-  ReadStatus read_all(swoc::FixedBufferWriter &bw);
+  /// Read all the content from the socket till the message is complete.
+  ReadStatus read_all(std::string &content);
 
   /// Closes the socket.
   void
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index 0408550e6f..d0e41fe172 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -26,6 +26,8 @@
 
 #include <yaml-cpp/yaml.h>
 #include <tscore/Layout.h>
+#include <tscore/ink_assert.h>
+
 #include <swoc/BufferWriter.h>
 
 #include "shared/rpc/IPCSocketClient.h"
@@ -38,10 +40,6 @@ namespace shared::rpc
 ///
 class RPCClient
 {
-  // Large buffer, as we may query a full list of records(metrics can be a lot 
bigger).
-  // TODO: should we add a parameter to increase the buffer? or maybe a record 
limit on the server's side?
-  static constexpr size_t BUFFER_SIZE{35600000};
-
 public:
   RPCClient() : _client(Layout::get()->runtimedir + "/jsonrpc20.sock") {}
 
@@ -53,22 +51,27 @@ public:
   invoke(std::string_view req)
   {
     std::string err_text; // for error messages.
-    std::unique_ptr<char[]> buf(new char[BUFFER_SIZE]);
-    swoc::FixedBufferWriter bw{buf.get(), BUFFER_SIZE};
     try {
       _client.connect();
       if (!_client.is_closed()) {
+        std::string resp;
         _client.send(req);
-        switch (_client.read_all(bw)) {
+        switch (_client.read_all(resp)) {
         case IPCSocketClient::ReadStatus::NO_ERROR: {
           _client.disconnect();
-          return {bw.data(), bw.size()};
+          return resp;
         }
         case IPCSocketClient::ReadStatus::BUFFER_FULL:
-          swoc::bwprint(err_text, "Buffer full, not enough space to read the 
response. Buffer size: {}", BUFFER_SIZE);
+          // we don't expect this to happen as client will not let us know 
about
+          // this for now. Keep this in case we decide to put a limit on
+          // responses.
+          ink_assert(!"Buffer full, not enough space to read the response.");
+          break;
+        case IPCSocketClient::ReadStatus::STREAM_ERROR:
+          err_text = "STREAM_ERROR: Error while reading response.";
           break;
         default:
-          err_text = "Something happened, we can't read the response";
+          err_text = "Something happened, we can't read the response. Unknown 
error.";
           break;
         }
       } else {
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc 
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 652ef11f8e..c08183e34a 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -190,21 +190,21 @@ struct ScopedLocalSocket : shared::rpc::IPCSocketClient {
   std::string
   read()
   {
-    swoc::LocalBufferWriter<32000> bw;
-    auto ret = super::read_all(bw);
+    std::string buf;
+    auto ret = super::read_all(buf);
     if (ret == ReadStatus::NO_ERROR) {
-      return {bw.data(), bw.size()};
+      return buf;
     }
     return {};
   }
-  // small wrapper function to deal with the bw.
+  // small wrapper function to deal with the flow.
   std::string
   query(std::string_view msg)
   {
-    swoc::LocalBufferWriter<32000> bw;
-    auto ret = connect().send(msg).read_all(bw);
+    std::string buf;
+    auto ret = connect().send(msg).read_all(buf);
     if (ret == ReadStatus::NO_ERROR) {
-      return {bw.data(), bw.size()};
+      return buf;
     }
 
     return {};
diff --git a/src/shared/rpc/IPCSocketClient.cc 
b/src/shared/rpc/IPCSocketClient.cc
index ce8363e9ae..3394dd5677 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -19,12 +19,83 @@
 */
 #include <stdexcept>
 #include <chrono>
+#include <sstream>
+#include <utility>
 
 #include "tscpp/util/ts_bw_format.h"
 
 #include "shared/rpc/IPCSocketClient.h"
 #include <tscore/ink_assert.h>
 
+namespace
+{
+/// @brief Simple buffer to store the jsonrpc server's response.
+///
+///        With small content it will just use the LocalBufferWritter, if the
+///        content gets bigger, then it will just save the buffer into a stream
+///        and reuse the already created BufferWritter.
+template <size_t N> class BufferStream
+{
+  std::ostringstream _os;
+  swoc::LocalBufferWriter<N> _bw;
+  size_t _written{0};
+
+public:
+  char *
+  writable_data()
+  {
+    return _bw.aux_data();
+  }
+
+  void
+  save(size_t n)
+  {
+    _bw.commit(n);
+
+    if (_bw.remaining() == 0) { // no more space available, flush what's on 
the bw
+                                // and reset it.
+      flush();
+    }
+  }
+
+  size_t
+  available() const
+  {
+    return _bw.remaining();
+  }
+
+  void
+  flush()
+  {
+    if (_bw.size() == 0) {
+      return;
+    }
+    _os.write(_bw.view().data(), _bw.size());
+    _written += _bw.size();
+
+    _bw.clear();
+  }
+
+  std::string
+  str()
+  {
+    if (stored() <= _bw.size()) {
+      return {_bw.data(), _bw.size()};
+    }
+
+    flush();
+    return _os.str();
+  }
+
+  size_t
+  stored() const
+  {
+    return _written ? _written : _bw.size();
+  }
+};
+
+} // namespace
+
 namespace shared::rpc
 {
 IPCSocketClient::self_reference
@@ -62,28 +133,26 @@ IPCSocketClient ::send(std::string_view data)
 }
 
 IPCSocketClient::ReadStatus
-IPCSocketClient::read_all(swoc::FixedBufferWriter &bw)
+IPCSocketClient::read_all(std::string &content)
 {
   if (this->is_closed()) {
     // we had a failure.
     return {};
   }
+
+  BufferStream<356000> bs;
+
   ReadStatus readStatus{ReadStatus::UNKNOWN};
-  while (bw.remaining()) {
-    swoc::MemSpan<char> span{bw.aux_data(), bw.remaining()};
-    const ssize_t ret = ::read(_sock, span.data(), span.size());
+  while (true) {
+    auto buf           = bs.writable_data();
+    const auto to_read = bs.available();
+    const ssize_t ret  = ::read(_sock, buf, to_read);
+
     if (ret > 0) {
-      bw.commit(ret);
-      if (bw.remaining() > 0) { // some space available.
-        continue;
-      } else {
-        // buffer full.
-        readStatus = ReadStatus::BUFFER_FULL;
-        break;
-      }
+      bs.save(ret);
+      continue;
     } else {
-      if (bw.size()) {
-        // data was read.
+      if (bs.stored() > 0) {
         readStatus = ReadStatus::NO_ERROR;
         break;
       }
@@ -91,6 +160,7 @@ IPCSocketClient::read_all(swoc::FixedBufferWriter &bw)
       break;
     }
   }
+  content = bs.str();
   return readStatus;
 }
 } // namespace shared::rpc

Reply via email to