This is an automated email from the ASF dual-hosted git repository.
dmeden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 91e3921962 Tools: JSON-RPC - Change the Unix Domain Socket client to
retry and wait when connection to the server. (#11523)
91e3921962 is described below
commit 91e3921962cd574a8f7edf3a16c7f9e9f66e393f
Author: Damian Meden <[email protected]>
AuthorDate: Wed Jul 10 10:29:13 2024 +0200
Tools: JSON-RPC - Change the Unix Domain Socket client to retry and wait
when connection to the server. (#11523)
* Tools: JSON-RPC - Change the Unix Domain Socket client to retry and wait
for the connection to be done.
Very simple mechanism to wait and retry when the connect to the server
socket is temporarily unavailable.
---
include/shared/rpc/IPCSocketClient.h | 4 +-
include/shared/rpc/RPCClient.h | 2 +-
src/mgmt/rpc/server/unit_tests/test_rpcserver.cc | 3 +-
src/shared/rpc/IPCSocketClient.cc | 68 ++++++++++++++++++------
4 files changed, 59 insertions(+), 18 deletions(-)
diff --git a/include/shared/rpc/IPCSocketClient.h
b/include/shared/rpc/IPCSocketClient.h
index c93947e6c0..9c7273bc4c 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -31,6 +31,7 @@
namespace shared::rpc
{
+using namespace std::chrono_literals;
/// The goal of this class is abstract the Unix Socket implementation and
provide a JSONRPC Node client for Tests and client's
/// applications like traffic_ctl and traffic_top.
/// To make the usage easy and more readable this class provides a chained
API, so you can do this like this:
@@ -50,7 +51,8 @@ struct IPCSocketClient {
~IPCSocketClient() { this->disconnect(); }
/// Connect to the configured socket path.
- self_reference connect();
+ /// Connection will retry every @c ms for @c attempts times if errno is
EAGAIN
+ self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts =
5);
/// Send all the passed string to the socket.
self_reference send(std::string_view data);
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index d0e41fe172..e4492f156c 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -68,7 +68,7 @@ public:
ink_assert(!"Buffer full, not enough space to read the response.");
break;
case IPCSocketClient::ReadStatus::STREAM_ERROR:
- err_text = "STREAM_ERROR: Error while reading response.";
+ err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while
reading response. {}({})", std::strerror(errno), errno);
break;
default:
err_text = "Something happened, we can't read the response. Unknown
error.";
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 3a1689a0a8..7238450f34 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -43,6 +43,7 @@
#include "shared/rpc/IPCSocketClient.h"
#include "iocore/eventsystem/EventSystem.h"
#include "tscore/Layout.h"
+#include "tscore/ink_sock.h"
#include "iocore/utils/diags.i"
#define DEFINE_JSONRPC_PROTO_FUNCTION(fn) swoc::Rv<YAML::Node>
fn(std::string_view const &, const YAML::Node ¶ms)
@@ -158,7 +159,7 @@ struct ScopedLocalSocket : shared::rpc::IPCSocketClient {
int chunk_number{1};
auto chunks = chunk<N>(data);
for (auto &&part : chunks) {
- if (::write(_sock, part.c_str(), part.size()) < 0) {
+ if (safe_write(_sock, part.c_str(), part.size()) < 0) {
Debug(logTag, "error sending message :%s", std ::strerror(errno));
break;
}
diff --git a/src/shared/rpc/IPCSocketClient.cc
b/src/shared/rpc/IPCSocketClient.cc
index f9df7df0e4..a6c6215f93 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -17,15 +17,19 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+#include <unistd.h>
+
#include <stdexcept>
#include <chrono>
#include <sstream>
#include <utility>
+#include <thread>
#include "tsutil/ts_bw_format.h"
#include "shared/rpc/IPCSocketClient.h"
#include <tscore/ink_assert.h>
+#include <tscore/ink_sock.h>
namespace
{
@@ -93,29 +97,60 @@ public:
return _written ? _written : _bw.size();
}
};
-
} // namespace
namespace shared::rpc
{
+
IPCSocketClient::self_reference
-IPCSocketClient::connect()
+IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
{
+ std::string text;
+ int err, tries{attempts};
+ bool done{false};
_sock = socket(AF_UNIX, SOCK_STREAM, 0);
+
if (this->is_closed()) {
- std::string text;
- swoc::bwprint(text, "connect: error creating new socket. Why?: {}\n",
std::strerror(errno));
- throw std::runtime_error{text};
+ throw std::runtime_error(swoc::bwprint(text, "connect: error creating new
socket. Reason: {}\n", std::strerror(errno)));
}
+
+ if (safe_fcntl(_sock, F_SETFL, O_NONBLOCK) < 0) {
+ this->close();
+ throw std::runtime_error(swoc::bwprint(text, "connect: fcntl error.
Reason: {}\n", std::strerror(errno)));
+ }
+
_server.sun_family = AF_UNIX;
std::strncpy(_server.sun_path, _path.c_str(), sizeof(_server.sun_path) - 1);
- if (::connect(_sock, (struct sockaddr *)&_server, sizeof(struct
sockaddr_un)) < 0) {
+
+ // Very simple connect and retry. We will just try to connect to the Unix
Domain
+ // Socket and if it tell us to retry we just wait for a few ms and try again
for
+ // X times.
+ do {
+ --tries;
+ if (::connect(_sock, (struct sockaddr *)&_server, sizeof(struct
sockaddr_un)) >= 0) {
+ done = true;
+ break;
+ }
+
+ if (errno == EAGAIN || errno == EINPROGRESS) {
+ // Connection cannot be completed immediately
+ // EAGAIN for UDS should suffice, but just in case.
+ std::this_thread::sleep_for(ms);
+ err = errno;
+ continue;
+ } else {
+ // No worth it.
+ err = errno;
+ break;
+ }
+ } while (tries != 0);
+
+ if ((tries == 0 && !done) || !done) {
this->close();
- std::string text;
- swoc::bwprint(text, "connect: Couldn't open connection with {}. Why?:
{}\n", _path, std::strerror(errno));
- throw std::runtime_error{text};
+ errno = err;
+ throw std::runtime_error(swoc::bwprint(text, "connect(attempts={}/{}):
Couldn't open connection with {}. Last error: {}({})\n",
+ (attempts - tries), attempts,
_path, std::strerror(errno), errno));
}
-
return *this;
}
@@ -123,10 +158,10 @@ IPCSocketClient::self_reference
IPCSocketClient ::send(std::string_view data)
{
std::string msg{data};
- if (::write(_sock, msg.c_str(), msg.size()) < 0) {
+ if (safe_write(_sock, msg.c_str(), msg.size()) < 0) {
this->close();
std::string text;
- throw std::runtime_error{swoc::bwprint(text, "Error writing on stream
socket {}", std ::strerror(errno))};
+ throw std::runtime_error{swoc::bwprint(text, "Error writing on stream
socket {}({})", std::strerror(errno), errno)};
}
return *this;
@@ -144,9 +179,12 @@ IPCSocketClient::read_all(std::string &content)
ReadStatus readStatus{ReadStatus::UNKNOWN};
while (true) {
- auto buf = bs.writable_data();
- const auto to_read = bs.available();
- const ssize_t ret = ::read(_sock, buf, to_read);
+ auto buf = bs.writable_data();
+ const auto to_read = bs.available();
+ ssize_t ret{-1};
+ do {
+ ret = ::read(_sock, buf, to_read);
+ } while (ret < 0 && (errno == EAGAIN || errno == EINTR));
if (ret > 0) {
bs.save(ret);