This is an automated email from the ASF dual-hosted git repository. dmeden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new 3e131d6a38 traffic_ctl: Remove the hardcoded big buffer and use a stream buffer (#10870) 3e131d6a38 is described below commit 3e131d6a38b578bbbb44e0b84d24ff70d276ef55 Author: Damian Meden <dme...@apache.org> AuthorDate: Fri Dec 1 12:21:47 2023 +0100 traffic_ctl: Remove the hardcoded big buffer and use a stream buffer (#10870) instead. On small responses this will just use the fixed buffer, if response contains more than the fixed buffer then it will save the data into a stringstream and reuse the already allocated buffer. --- include/shared/rpc/IPCSocketClient.h | 4 +- include/shared/rpc/RPCClient.h | 23 +++--- src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 14 ++-- src/shared/rpc/IPCSocketClient.cc | 98 ++++++++++++++++++++---- 4 files changed, 106 insertions(+), 33 deletions(-) diff --git a/include/shared/rpc/IPCSocketClient.h b/include/shared/rpc/IPCSocketClient.h index d738734a2a..73b49b499c 100644 --- a/include/shared/rpc/IPCSocketClient.h +++ b/include/shared/rpc/IPCSocketClient.h @@ -55,8 +55,8 @@ struct IPCSocketClient { /// Send all the passed string to the socket. self_reference send(std::string_view data); - /// Read all the content from the socket till the passed buffer is full. - ReadStatus read_all(swoc::FixedBufferWriter &bw); + /// Read all the content from the socket till the message is complete. + ReadStatus read_all(std::string &content); /// Closes the socket. void diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h index 0408550e6f..d0e41fe172 100644 --- a/include/shared/rpc/RPCClient.h +++ b/include/shared/rpc/RPCClient.h @@ -26,6 +26,8 @@ #include <yaml-cpp/yaml.h> #include <tscore/Layout.h> +#include <tscore/ink_assert.h> + #include <swoc/BufferWriter.h> #include "shared/rpc/IPCSocketClient.h" @@ -38,10 +40,6 @@ namespace shared::rpc /// class RPCClient { - // Large buffer, as we may query a full list of records(metrics can be a lot bigger). - // TODO: should we add a parameter to increase the buffer? or maybe a record limit on the server's side? - static constexpr size_t BUFFER_SIZE{35600000}; - public: RPCClient() : _client(Layout::get()->runtimedir + "/jsonrpc20.sock") {} @@ -53,22 +51,27 @@ public: invoke(std::string_view req) { std::string err_text; // for error messages. - std::unique_ptr<char[]> buf(new char[BUFFER_SIZE]); - swoc::FixedBufferWriter bw{buf.get(), BUFFER_SIZE}; try { _client.connect(); if (!_client.is_closed()) { + std::string resp; _client.send(req); - switch (_client.read_all(bw)) { + switch (_client.read_all(resp)) { case IPCSocketClient::ReadStatus::NO_ERROR: { _client.disconnect(); - return {bw.data(), bw.size()}; + return resp; } case IPCSocketClient::ReadStatus::BUFFER_FULL: - swoc::bwprint(err_text, "Buffer full, not enough space to read the response. Buffer size: {}", BUFFER_SIZE); + // we don't expect this to happen as client will not let us know about + // this for now. Keep this in case we decide to put a limit on + // responses. + ink_assert(!"Buffer full, not enough space to read the response."); + break; + case IPCSocketClient::ReadStatus::STREAM_ERROR: + err_text = "STREAM_ERROR: Error while reading response."; break; default: - err_text = "Something happened, we can't read the response"; + err_text = "Something happened, we can't read the response. Unknown error."; break; } } else { diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc index 652ef11f8e..c08183e34a 100644 --- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc +++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc @@ -190,21 +190,21 @@ struct ScopedLocalSocket : shared::rpc::IPCSocketClient { std::string read() { - swoc::LocalBufferWriter<32000> bw; - auto ret = super::read_all(bw); + std::string buf; + auto ret = super::read_all(buf); if (ret == ReadStatus::NO_ERROR) { - return {bw.data(), bw.size()}; + return buf; } return {}; } - // small wrapper function to deal with the bw. + // small wrapper function to deal with the flow. std::string query(std::string_view msg) { - swoc::LocalBufferWriter<32000> bw; - auto ret = connect().send(msg).read_all(bw); + std::string buf; + auto ret = connect().send(msg).read_all(buf); if (ret == ReadStatus::NO_ERROR) { - return {bw.data(), bw.size()}; + return buf; } return {}; diff --git a/src/shared/rpc/IPCSocketClient.cc b/src/shared/rpc/IPCSocketClient.cc index ce8363e9ae..3394dd5677 100644 --- a/src/shared/rpc/IPCSocketClient.cc +++ b/src/shared/rpc/IPCSocketClient.cc @@ -19,12 +19,83 @@ */ #include <stdexcept> #include <chrono> +#include <sstream> +#include <utility> #include "tscpp/util/ts_bw_format.h" #include "shared/rpc/IPCSocketClient.h" #include <tscore/ink_assert.h> +namespace +{ +/// @brief Simple buffer to store the jsonrpc server's response. +/// +/// With small content it will just use the LocalBufferWritter, if the +/// content gets bigger, then it will just save the buffer into a stream +/// and reuse the already created BufferWritter. +template <size_t N> class BufferStream +{ + std::ostringstream _os; + swoc::LocalBufferWriter<N> _bw; + size_t _written{0}; + +public: + char * + writable_data() + { + return _bw.aux_data(); + } + + void + save(size_t n) + { + _bw.commit(n); + + if (_bw.remaining() == 0) { // no more space available, flush what's on the bw + // and reset it. + flush(); + } + } + + size_t + available() const + { + return _bw.remaining(); + } + + void + flush() + { + if (_bw.size() == 0) { + return; + } + _os.write(_bw.view().data(), _bw.size()); + _written += _bw.size(); + + _bw.clear(); + } + + std::string + str() + { + if (stored() <= _bw.size()) { + return {_bw.data(), _bw.size()}; + } + + flush(); + return _os.str(); + } + + size_t + stored() const + { + return _written ? _written : _bw.size(); + } +}; + +} // namespace + namespace shared::rpc { IPCSocketClient::self_reference @@ -62,28 +133,26 @@ IPCSocketClient ::send(std::string_view data) } IPCSocketClient::ReadStatus -IPCSocketClient::read_all(swoc::FixedBufferWriter &bw) +IPCSocketClient::read_all(std::string &content) { if (this->is_closed()) { // we had a failure. return {}; } + + BufferStream<356000> bs; + ReadStatus readStatus{ReadStatus::UNKNOWN}; - while (bw.remaining()) { - swoc::MemSpan<char> span{bw.aux_data(), bw.remaining()}; - const ssize_t ret = ::read(_sock, span.data(), span.size()); + while (true) { + auto buf = bs.writable_data(); + const auto to_read = bs.available(); + const ssize_t ret = ::read(_sock, buf, to_read); + if (ret > 0) { - bw.commit(ret); - if (bw.remaining() > 0) { // some space available. - continue; - } else { - // buffer full. - readStatus = ReadStatus::BUFFER_FULL; - break; - } + bs.save(ret); + continue; } else { - if (bw.size()) { - // data was read. + if (bs.stored() > 0) { readStatus = ReadStatus::NO_ERROR; break; } @@ -91,6 +160,7 @@ IPCSocketClient::read_all(swoc::FixedBufferWriter &bw) break; } } + content = bs.str(); return readStatus; } } // namespace shared::rpc