This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 3e131d6a38 traffic_ctl: Remove the hardcoded big buffer and use a
stream buffer (#10870)
3e131d6a38 is described below
commit 3e131d6a38b578bbbb44e0b84d24ff70d276ef55
Author: Damian Meden <[email protected]>
AuthorDate: Fri Dec 1 12:21:47 2023 +0100
traffic_ctl: Remove the hardcoded big buffer and use a stream buffer
(#10870)
instead.
On small responses this will just use the fixed buffer, if response
contains more than the fixed buffer then it will save the data into a
stringstream and reuse the already allocated buffer.
---
include/shared/rpc/IPCSocketClient.h | 4 +-
include/shared/rpc/RPCClient.h | 23 +++---
src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 14 ++--
src/shared/rpc/IPCSocketClient.cc | 98 ++++++++++++++++++++----
4 files changed, 106 insertions(+), 33 deletions(-)
diff --git a/include/shared/rpc/IPCSocketClient.h
b/include/shared/rpc/IPCSocketClient.h
index d738734a2a..73b49b499c 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -55,8 +55,8 @@ struct IPCSocketClient {
/// Send all the passed string to the socket.
self_reference send(std::string_view data);
- /// Read all the content from the socket till the passed buffer is full.
- ReadStatus read_all(swoc::FixedBufferWriter &bw);
+ /// Read all the content from the socket till the message is complete.
+ ReadStatus read_all(std::string &content);
/// Closes the socket.
void
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index 0408550e6f..d0e41fe172 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -26,6 +26,8 @@
#include <yaml-cpp/yaml.h>
#include <tscore/Layout.h>
+#include <tscore/ink_assert.h>
+
#include <swoc/BufferWriter.h>
#include "shared/rpc/IPCSocketClient.h"
@@ -38,10 +40,6 @@ namespace shared::rpc
///
class RPCClient
{
- // Large buffer, as we may query a full list of records(metrics can be a lot
bigger).
- // TODO: should we add a parameter to increase the buffer? or maybe a record
limit on the server's side?
- static constexpr size_t BUFFER_SIZE{35600000};
-
public:
RPCClient() : _client(Layout::get()->runtimedir + "/jsonrpc20.sock") {}
@@ -53,22 +51,27 @@ public:
invoke(std::string_view req)
{
std::string err_text; // for error messages.
- std::unique_ptr<char[]> buf(new char[BUFFER_SIZE]);
- swoc::FixedBufferWriter bw{buf.get(), BUFFER_SIZE};
try {
_client.connect();
if (!_client.is_closed()) {
+ std::string resp;
_client.send(req);
- switch (_client.read_all(bw)) {
+ switch (_client.read_all(resp)) {
case IPCSocketClient::ReadStatus::NO_ERROR: {
_client.disconnect();
- return {bw.data(), bw.size()};
+ return resp;
}
case IPCSocketClient::ReadStatus::BUFFER_FULL:
- swoc::bwprint(err_text, "Buffer full, not enough space to read the
response. Buffer size: {}", BUFFER_SIZE);
+ // we don't expect this to happen as client will not let us know
about
+ // this for now. Keep this in case we decide to put a limit on
+ // responses.
+ ink_assert(!"Buffer full, not enough space to read the response.");
+ break;
+ case IPCSocketClient::ReadStatus::STREAM_ERROR:
+ err_text = "STREAM_ERROR: Error while reading response.";
break;
default:
- err_text = "Something happened, we can't read the response";
+ err_text = "Something happened, we can't read the response. Unknown
error.";
break;
}
} else {
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 652ef11f8e..c08183e34a 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -190,21 +190,21 @@ struct ScopedLocalSocket : shared::rpc::IPCSocketClient {
std::string
read()
{
- swoc::LocalBufferWriter<32000> bw;
- auto ret = super::read_all(bw);
+ std::string buf;
+ auto ret = super::read_all(buf);
if (ret == ReadStatus::NO_ERROR) {
- return {bw.data(), bw.size()};
+ return buf;
}
return {};
}
- // small wrapper function to deal with the bw.
+ // small wrapper function to deal with the flow.
std::string
query(std::string_view msg)
{
- swoc::LocalBufferWriter<32000> bw;
- auto ret = connect().send(msg).read_all(bw);
+ std::string buf;
+ auto ret = connect().send(msg).read_all(buf);
if (ret == ReadStatus::NO_ERROR) {
- return {bw.data(), bw.size()};
+ return buf;
}
return {};
diff --git a/src/shared/rpc/IPCSocketClient.cc
b/src/shared/rpc/IPCSocketClient.cc
index ce8363e9ae..3394dd5677 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -19,12 +19,83 @@
*/
#include <stdexcept>
#include <chrono>
+#include <sstream>
+#include <utility>
#include "tscpp/util/ts_bw_format.h"
#include "shared/rpc/IPCSocketClient.h"
#include <tscore/ink_assert.h>
+namespace
+{
+/// @brief Simple buffer to store the jsonrpc server's response.
+///
+/// With small content it will just use the LocalBufferWritter, if the
+/// content gets bigger, then it will just save the buffer into a stream
+/// and reuse the already created BufferWritter.
+template <size_t N> class BufferStream
+{
+ std::ostringstream _os;
+ swoc::LocalBufferWriter<N> _bw;
+ size_t _written{0};
+
+public:
+ char *
+ writable_data()
+ {
+ return _bw.aux_data();
+ }
+
+ void
+ save(size_t n)
+ {
+ _bw.commit(n);
+
+ if (_bw.remaining() == 0) { // no more space available, flush what's on
the bw
+ // and reset it.
+ flush();
+ }
+ }
+
+ size_t
+ available() const
+ {
+ return _bw.remaining();
+ }
+
+ void
+ flush()
+ {
+ if (_bw.size() == 0) {
+ return;
+ }
+ _os.write(_bw.view().data(), _bw.size());
+ _written += _bw.size();
+
+ _bw.clear();
+ }
+
+ std::string
+ str()
+ {
+ if (stored() <= _bw.size()) {
+ return {_bw.data(), _bw.size()};
+ }
+
+ flush();
+ return _os.str();
+ }
+
+ size_t
+ stored() const
+ {
+ return _written ? _written : _bw.size();
+ }
+};
+
+} // namespace
+
namespace shared::rpc
{
IPCSocketClient::self_reference
@@ -62,28 +133,26 @@ IPCSocketClient ::send(std::string_view data)
}
IPCSocketClient::ReadStatus
-IPCSocketClient::read_all(swoc::FixedBufferWriter &bw)
+IPCSocketClient::read_all(std::string &content)
{
if (this->is_closed()) {
// we had a failure.
return {};
}
+
+ BufferStream<356000> bs;
+
ReadStatus readStatus{ReadStatus::UNKNOWN};
- while (bw.remaining()) {
- swoc::MemSpan<char> span{bw.aux_data(), bw.remaining()};
- const ssize_t ret = ::read(_sock, span.data(), span.size());
+ while (true) {
+ auto buf = bs.writable_data();
+ const auto to_read = bs.available();
+ const ssize_t ret = ::read(_sock, buf, to_read);
+
if (ret > 0) {
- bw.commit(ret);
- if (bw.remaining() > 0) { // some space available.
- continue;
- } else {
- // buffer full.
- readStatus = ReadStatus::BUFFER_FULL;
- break;
- }
+ bs.save(ret);
+ continue;
} else {
- if (bw.size()) {
- // data was read.
+ if (bs.stored() > 0) {
readStatus = ReadStatus::NO_ERROR;
break;
}
@@ -91,6 +160,7 @@ IPCSocketClient::read_all(swoc::FixedBufferWriter &bw)
break;
}
}
+ content = bs.str();
return readStatus;
}
} // namespace shared::rpc