This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch 10.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit f2da20fbd8d39b211235e153116e8d0d49da468b Author: Damian Meden <[email protected]> AuthorDate: Wed Jul 10 10:29:13 2024 +0200 Tools: JSON-RPC - Change the Unix Domain Socket client to retry and wait when connection to the server. (#11523) * Tools: JSON-RPC - Change the Unix Domain Socket client to retry and wait for the connection to be done. Very simple mechanism to wait and retry when the connect to the server socket is temporarily unavailable. (cherry picked from commit 91e3921962cd574a8f7edf3a16c7f9e9f66e393f) --- include/shared/rpc/IPCSocketClient.h | 4 +- include/shared/rpc/RPCClient.h | 2 +- src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 3 +- src/shared/rpc/IPCSocketClient.cc | 68 ++++++++++++++++++------ 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/include/shared/rpc/IPCSocketClient.h b/include/shared/rpc/IPCSocketClient.h index c93947e6c0..9c7273bc4c 100644 --- a/include/shared/rpc/IPCSocketClient.h +++ b/include/shared/rpc/IPCSocketClient.h @@ -31,6 +31,7 @@ namespace shared::rpc { +using namespace std::chrono_literals; /// The goal of this class is abstract the Unix Socket implementation and provide a JSONRPC Node client for Tests and client's /// applications like traffic_ctl and traffic_top. /// To make the usage easy and more readable this class provides a chained API, so you can do this like this: @@ -50,7 +51,8 @@ struct IPCSocketClient { ~IPCSocketClient() { this->disconnect(); } /// Connect to the configured socket path. - self_reference connect(); + /// Connection will retry every @c ms for @c attempts times if errno is EAGAIN + self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts = 5); /// Send all the passed string to the socket. self_reference send(std::string_view data); diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h index d0e41fe172..e4492f156c 100644 --- a/include/shared/rpc/RPCClient.h +++ b/include/shared/rpc/RPCClient.h @@ -68,7 +68,7 @@ public: ink_assert(!"Buffer full, not enough space to read the response."); break; case IPCSocketClient::ReadStatus::STREAM_ERROR: - err_text = "STREAM_ERROR: Error while reading response."; + err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while reading response. {}({})", std::strerror(errno), errno); break; default: err_text = "Something happened, we can't read the response. Unknown error."; diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc index 3a1689a0a8..7238450f34 100644 --- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc +++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc @@ -43,6 +43,7 @@ #include "shared/rpc/IPCSocketClient.h" #include "iocore/eventsystem/EventSystem.h" #include "tscore/Layout.h" +#include "tscore/ink_sock.h" #include "iocore/utils/diags.i" #define DEFINE_JSONRPC_PROTO_FUNCTION(fn) swoc::Rv<YAML::Node> fn(std::string_view const &, const YAML::Node ¶ms) @@ -158,7 +159,7 @@ struct ScopedLocalSocket : shared::rpc::IPCSocketClient { int chunk_number{1}; auto chunks = chunk<N>(data); for (auto &&part : chunks) { - if (::write(_sock, part.c_str(), part.size()) < 0) { + if (safe_write(_sock, part.c_str(), part.size()) < 0) { Debug(logTag, "error sending message :%s", std ::strerror(errno)); break; } diff --git a/src/shared/rpc/IPCSocketClient.cc b/src/shared/rpc/IPCSocketClient.cc index f9df7df0e4..a6c6215f93 100644 --- a/src/shared/rpc/IPCSocketClient.cc +++ b/src/shared/rpc/IPCSocketClient.cc @@ -17,15 +17,19 @@ See the License for the specific language governing permissions and limitations under the License. */ +#include <unistd.h> + #include <stdexcept> #include <chrono> #include <sstream> #include <utility> +#include <thread> #include "tsutil/ts_bw_format.h" #include "shared/rpc/IPCSocketClient.h" #include <tscore/ink_assert.h> +#include <tscore/ink_sock.h> namespace { @@ -93,29 +97,60 @@ public: return _written ? _written : _bw.size(); } }; - } // namespace namespace shared::rpc { + IPCSocketClient::self_reference -IPCSocketClient::connect() +IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts) { + std::string text; + int err, tries{attempts}; + bool done{false}; _sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (this->is_closed()) { - std::string text; - swoc::bwprint(text, "connect: error creating new socket. Why?: {}\n", std::strerror(errno)); - throw std::runtime_error{text}; + throw std::runtime_error(swoc::bwprint(text, "connect: error creating new socket. Reason: {}\n", std::strerror(errno))); } + + if (safe_fcntl(_sock, F_SETFL, O_NONBLOCK) < 0) { + this->close(); + throw std::runtime_error(swoc::bwprint(text, "connect: fcntl error. Reason: {}\n", std::strerror(errno))); + } + _server.sun_family = AF_UNIX; std::strncpy(_server.sun_path, _path.c_str(), sizeof(_server.sun_path) - 1); - if (::connect(_sock, (struct sockaddr *)&_server, sizeof(struct sockaddr_un)) < 0) { + + // Very simple connect and retry. We will just try to connect to the Unix Domain + // Socket and if it tell us to retry we just wait for a few ms and try again for + // X times. + do { + --tries; + if (::connect(_sock, (struct sockaddr *)&_server, sizeof(struct sockaddr_un)) >= 0) { + done = true; + break; + } + + if (errno == EAGAIN || errno == EINPROGRESS) { + // Connection cannot be completed immediately + // EAGAIN for UDS should suffice, but just in case. + std::this_thread::sleep_for(ms); + err = errno; + continue; + } else { + // No worth it. + err = errno; + break; + } + } while (tries != 0); + + if ((tries == 0 && !done) || !done) { this->close(); - std::string text; - swoc::bwprint(text, "connect: Couldn't open connection with {}. Why?: {}\n", _path, std::strerror(errno)); - throw std::runtime_error{text}; + errno = err; + throw std::runtime_error(swoc::bwprint(text, "connect(attempts={}/{}): Couldn't open connection with {}. Last error: {}({})\n", + (attempts - tries), attempts, _path, std::strerror(errno), errno)); } - return *this; } @@ -123,10 +158,10 @@ IPCSocketClient::self_reference IPCSocketClient ::send(std::string_view data) { std::string msg{data}; - if (::write(_sock, msg.c_str(), msg.size()) < 0) { + if (safe_write(_sock, msg.c_str(), msg.size()) < 0) { this->close(); std::string text; - throw std::runtime_error{swoc::bwprint(text, "Error writing on stream socket {}", std ::strerror(errno))}; + throw std::runtime_error{swoc::bwprint(text, "Error writing on stream socket {}({})", std::strerror(errno), errno)}; } return *this; @@ -144,9 +179,12 @@ IPCSocketClient::read_all(std::string &content) ReadStatus readStatus{ReadStatus::UNKNOWN}; while (true) { - auto buf = bs.writable_data(); - const auto to_read = bs.available(); - const ssize_t ret = ::read(_sock, buf, to_read); + auto buf = bs.writable_data(); + const auto to_read = bs.available(); + ssize_t ret{-1}; + do { + ret = ::read(_sock, buf, to_read); + } while (ret < 0 && (errno == EAGAIN || errno == EINTR)); if (ret > 0) { bs.save(ret);
