This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 19bade3d6 [rpc] modernize code a bit
19bade3d6 is described below
commit 19bade3d6ed8422591d32f48df64d3b4cdf5dc55
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Dec 15 19:57:38 2023 -0800
[rpc] modernize code a bit
Since I'm updating the code in src/kudurpc/connection.{h,cc} and
src/kudu/util/net/socket.{h,cc} in follow-up changelists, I went ahead
and updated the code to be style-compliant, modernized it, and did
other unsorted improvements before introducing the new functionality.
Change-Id: I2634591426c3e107e4d11b661ff62e5cde8a7570
Reviewed-on: http://gerrit.cloudera.org:8080/20815
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/connection.cc | 288 ++++++++++++++++++++++++--------------------
src/kudu/rpc/connection.h | 27 ++---
src/kudu/rpc/messenger.cc | 18 ++-
src/kudu/rpc/messenger.h | 6 +-
src/kudu/rpc/proxy.cc | 54 +++++----
src/kudu/rpc/reactor.cc | 23 ++--
src/kudu/rpc/reactor.h | 8 +-
src/kudu/util/net/socket.cc | 94 ++++++++++-----
src/kudu/util/net/socket.h | 25 ++--
9 files changed, 303 insertions(+), 240 deletions(-)
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index ddf8dd3e5..c6727b805 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -19,10 +19,14 @@
#include <netinet/in.h>
#include <netinet/tcp.h>
-#include <string.h>
+#include <sys/socket.h>
+#ifdef __linux__
+#include <sys/ioctl.h>
+#endif
#include <algorithm>
#include <cerrno>
+#include <cstddef>
#include <iostream>
#include <memory>
#include <set>
@@ -53,15 +57,11 @@
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
-#include <sys/socket.h>
-#ifdef __linux__
-#include <sys/ioctl.h>
-#endif
-
using kudu::security::TlsSocket;
using std::includes;
using std::set;
using std::shared_ptr;
+using std::string;
using std::unique_ptr;
using strings::Substitute;
@@ -199,8 +199,8 @@ struct tcp_info {
///
/// Connection
///
-Connection::Connection(ReactorThread *reactor_thread,
- Sockaddr remote,
+Connection::Connection(ReactorThread* reactor_thread,
+ const Sockaddr& remote,
unique_ptr<Socket> socket,
Direction direction,
CredentialsPolicy policy)
@@ -231,7 +231,7 @@ Status Connection::SetTcpKeepAlive(int idle_time_s, int
retry_time_s, int num_re
void Connection::EpollRegister(ev::loop_ref& loop) {
DCHECK(reactor_thread_->IsCurrentThread());
- DVLOG(4) << "Registering connection for epoll: " << ToString();
+ DVLOG(4) << Substitute("registering connection for epoll: $0", ToString());
write_io_.set(loop);
write_io_.set(socket_->GetFd(), ev::WRITE);
write_io_.set<Connection, &Connection::WriteHandler>(this);
@@ -259,7 +259,7 @@ Connection::~Connection() {
bool Connection::Idle() const {
DCHECK(reactor_thread_->IsCurrentThread());
// check if we're in the middle of receiving something
- InboundTransfer *transfer = inbound_.get();
+ InboundTransfer* transfer = inbound_.get();
if (transfer && (transfer->TransferStarted())) {
return false;
}
@@ -284,7 +284,7 @@ bool Connection::Idle() const {
return true;
}
-void Connection::Shutdown(const Status &status,
+void Connection::Shutdown(const Status& status,
unique_ptr<ErrorStatusPB> rpc_error) {
DCHECK(reactor_thread_->IsCurrentThread());
shutdown_status_ = status.CloneAndPrepend("RPC connection failed");
@@ -292,16 +292,17 @@ void Connection::Shutdown(const Status &status,
if (inbound_ && inbound_->TransferStarted()) {
double secs_since_active =
(reactor_thread_->cur_time() - last_activity_time_).ToSeconds();
- LOG(WARNING) << "Shutting down " << ToString()
- << " with pending inbound data ("
- << inbound_->StatusAsString() << ", last active "
- << HumanReadableElapsedTime::ToShortString(secs_since_active)
- << " ago, status=" << status.ToString() << ")";
+ LOG(WARNING) << Substitute(
+ "shutting down $0 with pending inbound data: "
+ "$1; last active $2 ago: status $3",
+ ToString(),
+ inbound_->StatusAsString(),
+ HumanReadableElapsedTime::ToShortString(secs_since_active),
+ status.ToString());
}
// Clear any calls which have been sent and were awaiting a response.
- for (const car_map_t::value_type &v : awaiting_response_) {
- CallAwaitingResponse *c = v.second;
+ for (const auto& [_, c] : awaiting_response_) {
if (c->call) {
// Make sure every awaiting call receives the error info, if any.
unique_ptr<ErrorStatusPB> error;
@@ -320,7 +321,7 @@ void Connection::Shutdown(const Status &status,
// Clear any outbound transfers.
while (!outbound_transfers_.empty()) {
- OutboundTransfer *t = &outbound_transfers_.front();
+ auto* t = &outbound_transfers_.front();
outbound_transfers_.pop_front();
delete t;
}
@@ -336,14 +337,14 @@ void Connection::Shutdown(const Status &status,
void Connection::QueueOutbound(unique_ptr<OutboundTransfer> transfer) {
DCHECK(reactor_thread_->IsCurrentThread());
- if (!shutdown_status_.ok()) {
+ if (PREDICT_FALSE(!shutdown_status_.ok())) {
// If we've already shut down, then we just need to abort the
// transfer rather than bothering to queue it.
transfer->Abort(shutdown_status_);
return;
}
- DVLOG(3) << "Queueing transfer: " << transfer->HexDump();
+ DVLOG(3) << Substitute("queueing transfer: $0", transfer->HexDump());
outbound_transfers_.push_back(*transfer.release());
@@ -360,14 +361,16 @@ Connection::CallAwaitingResponse::~CallAwaitingResponse()
{
DCHECK(conn->reactor_thread_->IsCurrentThread());
}
-void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int
revents) {
+void Connection::CallAwaitingResponse::HandleTimeout(ev::timer& watcher,
+ int /*revents*/) {
if (remaining_timeout > 0) {
- if (watcher.remaining() < -1.0) {
- LOG(WARNING) << "RPC call timeout handler was delayed by "
- << -watcher.remaining() << "s! This may be due to a
process-wide "
- << "pause such as swapping, logging-related delays, or
allocator lock "
- << "contention. Will allow an additional "
- << remaining_timeout << "s for a response.";
+ const auto rem = watcher.remaining();
+ if (PREDICT_FALSE(rem < -1.0)) {
+ LOG(WARNING) << Substitute(
+ "RPC call timeout handler was delayed by $0s: this may be due "
+ "to a process-wide pause such as swapping, logging-related delays, "
+ "or allocator lock contention. Will allow extra $1s for a response",
+ rem, remaining_timeout);
}
watcher.set(remaining_timeout, 0);
@@ -379,7 +382,7 @@ void
Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int rev
conn->HandleOutboundCallTimeout(this);
}
-void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+void Connection::HandleOutboundCallTimeout(CallAwaitingResponse* car) {
DCHECK(reactor_thread_->IsCurrentThread());
if (!car->call) {
// The RPC may have been cancelled before the timeout was hit.
@@ -407,7 +410,7 @@ void
Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
// already timed out.
}
-void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_,
call->call_id());
if (car != nullptr) {
// car->call may be NULL if the call has timed out already.
@@ -423,7 +426,7 @@ Status Connection::GetLocalAddress(Sockaddr* addr) const {
}
// Inject a cancellation when 'call' is in state
'FLAGS_rpc_inject_cancellation_state'.
-void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall>
&call) {
+void inline Connection::MaybeInjectCancellation(const
shared_ptr<OutboundCall>& call) {
if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
reactor_thread_->reactor()->messenger()->QueueCancellation(call);
}
@@ -435,7 +438,7 @@ void inline Connection::MaybeInjectCancellation(const
shared_ptr<OutboundCall> &
struct CallTransferCallbacks : public TransferCallbacks {
public:
explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
- Connection *conn)
+ Connection* conn)
: call_(std::move(call)), conn_(conn) {}
void NotifyTransferFinished() override {
@@ -452,9 +455,9 @@ struct CallTransferCallbacks : public TransferCallbacks {
delete this;
}
- void NotifyTransferAborted(const Status &status) override {
- VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: "
- << status.ToString();
+ void NotifyTransferAborted(const Status& status) override {
+ VLOG(1) << Substitute(
+ "transfer of $0 aborted: $1", call_->ToString(), status.ToString());
delete this;
}
@@ -484,7 +487,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall>
call) {
DCHECK(!call->cancellation_requested());
// Assign the call ID.
- int32_t call_id = GetNextCallId();
+ const int32_t call_id = GetNextCallId();
call->set_call_id(call_id);
// Serialize the actual bytes to be put on the wire.
@@ -501,7 +504,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall>
call) {
car->call = call;
// Set up the timeout timer.
- const MonoDelta &timeout = call->controller()->timeout();
+ const auto& timeout = call->controller()->timeout();
if (timeout.Initialized()) {
reactor_thread_->RegisterTimeout(&car->timeout_timer);
car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
@@ -544,7 +547,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall>
call) {
car->timeout_timer.start();
}
- TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
+ TransferCallbacks* cb = new CallTransferCallbacks(std::move(call), this);
awaiting_response_[call_id] = car.release();
QueueOutbound(unique_ptr<OutboundTransfer>(
OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb)));
@@ -553,18 +556,17 @@ void
Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) {
// Callbacks for sending an RPC call response from the server.
// This takes ownership of the InboundCall object so that, once it has
// been responded to, we can free up all of the associated memory.
-struct ResponseTransferCallbacks : public TransferCallbacks {
+struct ResponseTransferCallbacks final : public TransferCallbacks {
public:
- ResponseTransferCallbacks(unique_ptr<InboundCall> call,
- Connection *conn) :
- call_(std::move(call)),
- conn_(conn)
- {}
+ ResponseTransferCallbacks(unique_ptr<InboundCall> call, Connection* conn)
+ : call_(std::move(call)),
+ conn_(conn) {
+ }
- ~ResponseTransferCallbacks() {
+ ~ResponseTransferCallbacks() override {
// Remove the call from the map.
- InboundCall *call_from_map = EraseKeyReturnValuePtr(
- &conn_->calls_being_handled_, call_->call_id());
+ auto* call_from_map = EraseKeyReturnValuePtr(
+ &conn_->calls_being_handled_, call_->call_id());
DCHECK_EQ(call_from_map, call_.get());
}
@@ -573,38 +575,38 @@ struct ResponseTransferCallbacks : public
TransferCallbacks {
}
void NotifyTransferAborted(const Status& /*status*/) override {
- LOG(WARNING) << "Connection torn down before " <<
- call_->ToString() << " could send its response";
+ LOG(WARNING) << Substitute(
+ "$0 torn down before $1 could send its response",
+ conn_->ToString(), call_->ToString());
delete this;
}
private:
unique_ptr<InboundCall> call_;
- Connection *conn_;
+ Connection* conn_;
};
// Reactor task which puts a transfer on the outbound transfer queue.
class QueueTransferTask : public ReactorTask {
public:
- QueueTransferTask(unique_ptr<OutboundTransfer> transfer,
- Connection *conn)
- : transfer_(std::move(transfer)),
- conn_(conn)
- {}
+ QueueTransferTask(unique_ptr<OutboundTransfer> transfer, Connection* conn)
+ : transfer_(std::move(transfer)),
+ conn_(conn) {
+ }
void Run(ReactorThread* /*thr*/) override {
conn_->QueueOutbound(std::move(transfer_));
delete this;
}
- void Abort(const Status &status) override {
+ void Abort(const Status& status) override {
transfer_->Abort(status);
delete this;
}
private:
unique_ptr<OutboundTransfer> transfer_;
- Connection *conn_;
+ Connection* conn_;
};
void Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
@@ -621,15 +623,15 @@ void
Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
TransferPayload tmp_slices;
call->SerializeResponseTo(&tmp_slices);
- TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
+ TransferCallbacks* cb = new ResponseTransferCallbacks(std::move(call), this);
// After the response is sent, can delete the InboundCall object.
// We set a dummy call ID and required feature set, since these are not
needed
// when sending responses.
unique_ptr<OutboundTransfer> t(
OutboundTransfer::CreateForCallResponse(tmp_slices, cb));
- QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
- reactor_thread_->reactor()->ScheduleReactorTask(task);
+ reactor_thread_->reactor()->ScheduleReactorTask(
+ new QueueTransferTask(std::move(t), this));
}
void Connection::set_confidential(bool is_confidential) {
@@ -646,10 +648,10 @@ RpczStore* Connection::rpcz_store() {
return reactor_thread_->reactor()->messenger()->rpcz_store();
}
-void Connection::ReadHandler(ev::io &watcher, int revents) {
+void Connection::ReadHandler(ev::io& /*watcher*/, int revents) {
DCHECK(reactor_thread_->IsCurrentThread());
- DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")";
+ DVLOG(3) << Substitute("$0 ReadHandler(revents=$1)", ToString(), revents);
if (revents & EV_ERROR) {
reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
": ReadHandler encountered an error"));
@@ -665,25 +667,34 @@ void Connection::ReadHandler(ev::io &watcher, int
revents) {
Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
if (PREDICT_FALSE(!status.ok())) {
if (status.posix_code() == ESHUTDOWN) {
- VLOG(1) << ToString() << " shut down by remote end.";
+ VLOG(1) << Substitute("$0 shut down by remote end", ToString());
} else {
- LOG(WARNING) << ToString() << " recv error: " << status.ToString();
+ LOG(WARNING) << Substitute("$0 recv error: $1",
+ ToString(), status.ToString());
}
reactor_thread_->DestroyConnection(this, status);
return;
}
if (!inbound_->TransferFinished()) {
- DVLOG(3) << ToString() << ": read is not yet finished yet.";
+ DVLOG(3) << Substitute("$0: read is not yet finished yet", ToString());
return;
}
- DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size()
<< " bytes";
-
- if (direction_ == CLIENT) {
- HandleCallResponse(std::move(inbound_));
- } else if (direction_ == SERVER) {
- HandleIncomingCall(std::move(inbound_));
- } else {
- LOG(FATAL) << "Invalid direction: " << direction_;
+ DVLOG(3) << Substitute("$0: finished reading $1 bytes",
+ ToString(), inbound_->data().size());
+
+ switch (direction_) {
+ case CLIENT:
+ HandleCallResponse(std::move(inbound_));
+ break;
+
+ case SERVER:
+ HandleIncomingCall(std::move(inbound_));
+ break;
+
+ default:
+ LOG(DFATAL) << Substitute("$0: invalid direction",
+ static_cast<uint16_t>(direction_));
+ break;
}
if (extra_buf.size() > 0) {
@@ -699,19 +710,24 @@ void
Connection::HandleIncomingCall(unique_ptr<InboundTransfer> transfer) {
unique_ptr<InboundCall> call(new InboundCall(this));
Status s = call->ParseFrom(std::move(transfer));
- if (!s.ok()) {
- LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
- // TODO: shutdown? probably, since any future stuff on this socket will be
- // "unsynchronized"
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(WARNING) << Substitute("$0: received bad data: '$1'",
+ ToString(), s.ToString());
+ // Shutting down down the connection since there is a high risk of
receiving
+ // "unsynchronized" data on this socket after this error.
+ Shutdown(s);
return;
}
- if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get()))
{
- LOG(WARNING) << ToString() << ": received call ID " << call->call_id() <<
- " but was already processing this ID! Ignoring";
+ if (PREDICT_FALSE(!InsertIfNotPresent(&calls_being_handled_,
+ call->call_id(),
+ call.get()))) {
+ LOG(WARNING) << Substitute(
+ "$0: received call ID $1 but was already processing this ID, ignoring",
+ ToString(), call->call_id());
reactor_thread_->DestroyConnection(
- this, Status::RuntimeError("Received duplicate call id",
- Substitute("$0", call->call_id())));
+ this, Status::RuntimeError("Received duplicate call id",
+ Substitute("$0", call->call_id())));
return;
}
@@ -723,11 +739,12 @@ void
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
unique_ptr<CallResponse> resp(new CallResponse);
CHECK_OK(resp->ParseFrom(std::move(transfer)));
- CallAwaitingResponse *car_ptr =
- EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
+ CallAwaitingResponse* car_ptr = EraseKeyReturnValuePtr(
+ &awaiting_response_, resp->call_id());
if (PREDICT_FALSE(car_ptr == nullptr)) {
- LOG(WARNING) << ToString() << ": Got a response for call id " <<
resp->call_id() << " which "
- << "was not pending! Ignoring.";
+ LOG(WARNING) << Substitute(
+ "$0: got a response for call id $1 which was not pending, ignoring",
+ ToString(), resp->call_id());
return;
}
@@ -736,8 +753,9 @@ void
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
if (PREDICT_FALSE(!car->call)) {
// The call already failed due to a timeout.
- VLOG(1) << "Got response to call id " << resp->call_id() << " after client
"
- << "already timed out or cancelled";
+ VLOG(1) << Substitute(
+ "got response to call id $0 after client already timed out or
cancelled",
+ resp->call_id());
return;
}
@@ -747,7 +765,7 @@ void
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
MaybeInjectCancellation(car->call);
}
-void Connection::WriteHandler(ev::io &watcher, int revents) {
+void Connection::WriteHandler(ev::io& /*watcher*/, int revents) {
DCHECK(reactor_thread_->IsCurrentThread());
if (revents & EV_ERROR) {
@@ -755,11 +773,12 @@ void Connection::WriteHandler(ev::io &watcher, int
revents) {
": writeHandler encountered an error"));
return;
}
- DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
+ DVLOG(3) << Substitute("$0: writeHandler: revents=$1", ToString(), revents);
if (outbound_transfers_.empty()) {
- LOG(WARNING) << ToString() << " got a ready-to-write callback, but there
is "
- "nothing to write.";
+ LOG(WARNING) << Substitute(
+ "$0 got a ready-to-write callback, but there is nothing to write",
+ ToString());
write_io_.stop();
return;
}
@@ -770,7 +789,7 @@ void Connection::WriteHandler(ev::io &watcher, int revents)
{
Connection::ProcessOutboundTransfersResult
Connection::ProcessOutboundTransfers() {
while (!outbound_transfers_.empty()) {
- OutboundTransfer* transfer = &(outbound_transfers_.front());
+ OutboundTransfer* transfer = &outbound_transfers_.front();
if (!transfer->TransferStarted()) {
if (transfer->is_for_outbound_call()) {
@@ -813,13 +832,14 @@ Connection::ProcessOutboundTransfersResult
Connection::ProcessOutboundTransfers(
last_activity_time_ = reactor_thread_->cur_time();
Status status = transfer->SendBuffer(socket_.get());
if (PREDICT_FALSE(!status.ok())) {
- LOG(WARNING) << ToString() << " send error: " << status.ToString();
+ LOG(WARNING) << Substitute(
+ "$0 send error: $1", ToString(), status.ToString());
reactor_thread_->DestroyConnection(this, status);
return kConnectionDestroyed;
}
if (!transfer->TransferFinished()) {
- DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
+ DVLOG(3) << Substitute("$0: writeHandler: xfer not finished",
ToString());
return kMoreToSend;
}
@@ -830,14 +850,14 @@ Connection::ProcessOutboundTransfersResult
Connection::ProcessOutboundTransfers(
return kNoMoreToSend;
}
-std::string Connection::ToString() const {
+string Connection::ToString() const {
// This may be called from other threads, so we cannot
// include anything in the output about the current state,
// which might concurrently change from another thread.
- return strings::Substitute(
- "$0 $1",
- direction_ == SERVER ? "server connection from" : "client connection to",
- remote_.ToString());
+ return Substitute("$0 $1",
+ direction_ == SERVER
+ ? "server connection from"
+ : "client connection to", remote_.ToString());
}
// Reactor task that transitions this Connection from connection negotiation to
@@ -847,22 +867,21 @@ class NegotiationCompletedTask : public ReactorTask {
NegotiationCompletedTask(Connection* conn,
Status negotiation_status,
std::unique_ptr<ErrorStatusPB> rpc_error)
- : conn_(conn),
- negotiation_status_(std::move(negotiation_status)),
- rpc_error_(std::move(rpc_error)) {
+ : conn_(conn),
+ negotiation_status_(std::move(negotiation_status)),
+ rpc_error_(std::move(rpc_error)) {
}
- void Run(ReactorThread *rthread) override {
+ void Run(ReactorThread* rthread) override {
rthread->CompleteConnectionNegotiation(conn_,
negotiation_status_,
std::move(rpc_error_));
delete this;
}
- void Abort(const Status &status) override {
+ void Abort(const Status& status) override {
DCHECK(conn_->reactor_thread()->reactor()->closing());
- VLOG(1) << "Failed connection negotiation due to shut down reactor thread:
"
- << status.ToString();
+ VLOG(1) << Substitute("connection negotiation aborted: $0",
status.ToString());
delete this;
}
@@ -885,7 +904,7 @@ void Connection::MarkNegotiationComplete() {
}
Status Connection::DumpPB(const DumpConnectionsRequestPB& req,
- RpcConnectionPB* resp) {
+ RpcConnectionPB* resp) const {
DCHECK(reactor_thread_->IsCurrentThread());
resp->set_remote_ip(remote_.ToString());
if (negotiation_complete_) {
@@ -894,27 +913,31 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB&
req,
resp->set_state(RpcConnectionPB::NEGOTIATING);
}
- if (direction_ == CLIENT) {
- for (const car_map_t::value_type& entry : awaiting_response_) {
- CallAwaitingResponse* c = entry.second;
- if (c->call) {
- c->call->DumpPB(req, resp->add_calls_in_flight());
+ switch (direction_) {
+ case CLIENT:
+ for (const auto& [_, c]: awaiting_response_) {
+ if (c->call) {
+ c->call->DumpPB(req, resp->add_calls_in_flight());
+ }
}
- }
+ resp->set_outbound_queue_size(outbound_transfers_.size());
+ break;
- resp->set_outbound_queue_size(num_queued_outbound_transfers());
- } else if (direction_ == SERVER) {
- if (negotiation_complete_) {
- // It's racy to dump credentials while negotiating, since the Connection
- // object is owned by the negotiation thread at that point.
- resp->set_remote_user_credentials(remote_user_.ToString());
- }
- for (const inbound_call_map_t::value_type& entry : calls_being_handled_) {
- InboundCall* c = entry.second;
- c->DumpPB(req, resp->add_calls_in_flight());
- }
- } else {
- LOG(FATAL);
+ case SERVER:
+ if (negotiation_complete_) {
+ // It's racy to dump credentials while negotiating, since the
Connection
+ // object is owned by the negotiation thread at that point.
+ resp->set_remote_user_credentials(remote_user_.ToString());
+ }
+ for (const auto& [_, c]: calls_being_handled_) {
+ c->DumpPB(req, resp->add_calls_in_flight());
+ }
+ break;
+
+ default:
+ LOG(DFATAL) << Substitute("$0: invalid direction",
+ static_cast<uint16_t>(direction_));
+ break;
}
#ifdef __linux__
if (negotiation_complete_ && remote_.is_ip()) {
@@ -937,11 +960,10 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB&
req,
Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
DCHECK(reactor_thread_->IsCurrentThread());
int fd = socket_->GetFd();
- CHECK_GE(fd, 0);
+ DCHECK_GE(fd, 0);
// Fetch TCP_INFO statistics from the kernel.
- tcp_info ti;
- memset(&ti, 0, sizeof(ti));
+ tcp_info ti = {};
socklen_t len = sizeof(ti);
int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
if (rc == 0) {
@@ -1026,12 +1048,12 @@ Status
Connection::GetTransportDetailsPB(TransportDetailsPB* pb) const {
tls->set_cipher_suite(tls_socket->GetCipherDescription());
}
- int fd = socket_->GetFd();
- CHECK_GE(fd, 0);
+ const int fd = socket_->GetFd();
+ DCHECK_GE(fd, 0);
int32_t max_seg_size = 0;
socklen_t optlen = sizeof(max_seg_size);
int ret = ::getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen);
- if (ret) {
+ if (PREDICT_FALSE(ret)) {
int err = errno;
return Status::NetworkError(
"getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err);
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index d0afbe536..eab48f54e 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -16,7 +16,6 @@
// under the License.
#pragma once
-#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
@@ -89,8 +88,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
// remote: the address of the remote end
// socket: the socket to take ownership of.
// direction: whether we are the client or server side
- Connection(ReactorThread *reactor_thread,
- Sockaddr remote,
+ Connection(ReactorThread* reactor_thread,
+ const Sockaddr& remote,
std::unique_ptr<Socket> socket,
Direction direction,
CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);
@@ -138,7 +137,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
// Cancel an outbound call by removing any reference to it by
CallAwaitingResponse
// in 'awaiting_responses_'.
- void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+ void CancelOutboundCall(const std::shared_ptr<OutboundCall>& call);
// The address of the remote end of the connection.
const Sockaddr& remote() const { return remote_; }
@@ -184,10 +183,10 @@ class Connection : public
RefCountedThreadSafe<Connection> {
RpczStore* rpcz_store();
// libev callback when data is available to read.
- void ReadHandler(ev::io &watcher, int revents);
+ void ReadHandler(ev::io& watcher, int revents); //
NOLINT(google-runtime-references)
// libev callback when we may write to the socket.
- void WriteHandler(ev::io &watcher, int revents);
+ void WriteHandler(ev::io& watcher, int revents);//
NOLINT(google-runtime-references)
enum ProcessOutboundTransfersResult {
// All of the transfers in the queue have been sent successfully.
@@ -224,7 +223,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
void MarkNegotiationComplete();
Status DumpPB(const DumpConnectionsRequestPB& req,
- RpcConnectionPB* resp);
+ RpcConnectionPB* resp) const;
ReactorThread* reactor_thread() const { return reactor_thread_; }
@@ -263,10 +262,6 @@ class Connection : public RefCountedThreadSafe<Connection>
{
scheduled_for_shutdown_ = true;
}
- size_t num_queued_outbound_transfers() const {
- return outbound_transfers_.size();
- }
-
private:
friend struct CallAwaitingResponse;
friend class QueueTransferTask;
@@ -279,9 +274,9 @@ class Connection : public RefCountedThreadSafe<Connection> {
~CallAwaitingResponse();
// Notification from libev that the call has timed out.
- void HandleTimeout(ev::timer &watcher, int revents);
+ void HandleTimeout(ev::timer& watcher, int revents); //
NOLINT(google-runtime-references)
- Connection *conn;
+ Connection* conn;
std::shared_ptr<OutboundCall> call;
ev::timer timeout_timer;
@@ -312,7 +307,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
// The given CallAwaitingResponse has elapsed its user-defined timeout.
// Set it to Failed.
- void HandleOutboundCallTimeout(CallAwaitingResponse *car);
+ void HandleOutboundCallTimeout(CallAwaitingResponse* car);
// Queue a transfer for sending on this connection.
// We will take ownership of the transfer.
@@ -321,7 +316,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
// Internal test function for injecting cancellation request when 'call'
// reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
- void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+ void MaybeInjectCancellation(const std::shared_ptr<OutboundCall>& call);
Status GetSocketStatsPB(SocketStatsPB* pb) const;
@@ -344,7 +339,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
RemoteUser remote_user_;
// whether we are client or server
- Direction direction_;
+ const Direction direction_;
// The last time we read or wrote from the socket.
MonoTime last_activity_time_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 6820490b3..df1046005 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -269,9 +269,8 @@ Status Messenger::UnregisterService(const string&
service_name) {
return Status::OK();
}
-void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
- Reactor *reactor = RemoteToReactor(call->conn_id().remote());
- reactor->QueueOutboundCall(call);
+void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+ RemoteToReactor(call->conn_id().remote())->QueueOutboundCall(call);
}
void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) {
@@ -304,17 +303,16 @@ void Messenger::QueueInboundCall(unique_ptr<InboundCall>
call) {
WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle
RPC call");
}
-void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
- Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+ Reactor* reactor = RemoteToReactor(call->conn_id().remote());
reactor->QueueCancellation(call);
}
-void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr
&remote) {
- Reactor *reactor = RemoteToReactor(remote);
- reactor->RegisterInboundSocket(new_socket, remote);
+void Messenger::RegisterInboundSocket(Socket* new_socket, const Sockaddr&
remote) {
+ RemoteToReactor(remote)->RegisterInboundSocket(new_socket, remote);
}
-Messenger::Messenger(const MessengerBuilder &bld)
+Messenger::Messenger(const MessengerBuilder& bld)
: name_(bld.name_),
state_(kStarted),
authentication_(RpcAuthentication::REQUIRED),
@@ -352,7 +350,7 @@ Messenger::~Messenger() {
STLDeleteElements(&reactors_);
}
-Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) {
+Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) const {
// This is just a static partitioning; we could get a lot
// fancier with assigning Sockaddrs to Reactors.
return reactors_[remote.HashCode() % reactors_.size()];
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 753681d46..a8044fca4 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -203,7 +203,7 @@ class MessengerBuilder {
// list and
// https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_ciphersuites.html
// for SSL_CTX_set_ciphersuites() API details.
- MessengerBuilder &set_rpc_tls_ciphersuites(
+ MessengerBuilder& set_rpc_tls_ciphersuites(
const std::string& rpc_tls_ciphersuites) {
rpc_tls_ciphersuites_ = rpc_tls_ciphersuites;
return *this;
@@ -211,7 +211,7 @@ class MessengerBuilder {
// Set the minimum protocol version to allow when for securing RPC
connections
// with TLS. May be one of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'.
- MessengerBuilder &set_rpc_tls_min_protocol(
+ MessengerBuilder& set_rpc_tls_min_protocol(
const std::string& rpc_tls_min_protocol) {
rpc_tls_min_protocol_ = rpc_tls_min_protocol;
return *this;
@@ -473,7 +473,7 @@ class Messenger {
explicit Messenger(const MessengerBuilder& bld);
- Reactor* RemoteToReactor(const Sockaddr& remote);
+ Reactor* RemoteToReactor(const Sockaddr& remote) const;
Status Init();
void RunTimeoutThread();
void UpdateCurTime();
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 9501a388a..b6b355960 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -20,6 +20,7 @@
#include <functional>
#include <iostream>
#include <memory>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -27,7 +28,7 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/messenger.h" // IWYU pragma: keep
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/response_callback.h"
@@ -43,6 +44,7 @@
#include "kudu/util/user.h"
using google::protobuf::Message;
+using std::make_shared;
using std::string;
using std::shared_ptr;
using std::unique_ptr;
@@ -52,7 +54,7 @@ using strings::Substitute;
namespace kudu {
namespace rpc {
-Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+Proxy::Proxy(shared_ptr<Messenger> messenger,
const Sockaddr& remote,
string hostname,
string service_name)
@@ -60,16 +62,16 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
dns_resolver_(nullptr),
messenger_(std::move(messenger)),
is_started_(false) {
- CHECK(messenger_ != nullptr);
+ DCHECK(messenger_);
DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
DCHECK(remote.is_initialized());
// By default, we set the real user to the currently logged-in user.
// Effective user and password remain blank.
string real_user;
- Status s = GetLoggedInUser(&real_user);
- if (!s.ok()) {
- LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get
logged-in user name: "
- << s.ToString() << " before connecting to remote: " <<
remote.ToString();
+ if (auto s = GetLoggedInUser(&real_user); !s.ok()) {
+ LOG(WARNING) << Substitute("$0: unable to get logged-in username "
+ "before connecting to remote $1 via $2 proxy",
+ s.ToString(), remote.ToString(), service_name_);
}
UserCredentials creds;
@@ -77,7 +79,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
}
-Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+Proxy::Proxy(shared_ptr<Messenger> messenger,
HostPort hp,
DnsResolver* dns_resolver,
string service_name)
@@ -86,12 +88,12 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
dns_resolver_(dns_resolver),
messenger_(std::move(messenger)),
is_started_(false) {
- CHECK(messenger_ != nullptr);
+ DCHECK(messenger_);
DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
DCHECK(hp_.Initialized());
}
-Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const {
+Sockaddr* Proxy::GetSingleSockaddr(vector<Sockaddr>* addrs) const {
DCHECK(!addrs->empty());
if (PREDICT_FALSE(addrs->size() > 1)) {
LOG(WARNING) << Substitute(
@@ -110,8 +112,9 @@ void Proxy::Init(Sockaddr addr) {
string real_user;
Status s = GetLoggedInUser(&real_user);
if (!s.ok()) {
- LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get
logged-in user name: "
- << s.ToString() << " before connecting to host/port: " <<
hp_.ToString();
+ LOG(WARNING) << Substitute(
+ "$0: unable to get logged-in username before connecting to $1 via $2
proxy",
+ s.ToString(), hp_.ToString(), service_name_);
}
vector<Sockaddr> addrs;
if (!addr.is_initialized()) {
@@ -141,9 +144,14 @@ void Proxy::EnqueueRequest(const string& method,
OutboundCall::CallbackBehavior cb_behavior) const {
ConnectionId connection = conn_id();
DCHECK(connection.remote().is_initialized());
- controller->call_.reset(
- new OutboundCall(connection, {service_name_, method},
std::move(req_payload),
- cb_behavior, response, controller, callback));
+ controller->call_ = make_shared<OutboundCall>(
+ connection,
+ RemoteMethod{service_name_, method},
+ std::move(req_payload),
+ cb_behavior,
+ response,
+ controller,
+ callback);
controller->SetMessenger(messenger_.get());
// If this fails to queue, the callback will get called immediately
@@ -151,7 +159,7 @@ void Proxy::EnqueueRequest(const string& method,
messenger_->QueueOutboundCall(controller->call_);
}
-void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
+void Proxy::RefreshDnsAndEnqueueRequest(const string& method,
unique_ptr<RequestPayload> req_payload,
google::protobuf::Message* response,
RpcController* controller,
@@ -169,7 +177,7 @@ void Proxy::RefreshDnsAndEnqueueRequest(const std::string&
method,
// NOTE: we need to keep a reference here because the callback may end up
// destructing the controller and the outbound call, _while_ the callback
// is running from within the call!
- auto shared_call = std::make_shared<OutboundCall>(
+ auto shared_call = make_shared<OutboundCall>(
conn_id(), RemoteMethod{service_name_, method}, response,
controller, callback);
controller->call_ = shared_call;
controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh
physical address"));
@@ -194,7 +202,7 @@ void Proxy::AsyncRequest(const string& method,
google::protobuf::Message* response,
RpcController* controller,
const ResponseCallback& callback) {
- CHECK(!controller->call_) << "Controller should be reset";
+ DCHECK(!controller->call_) << "Controller should be reset";
base::subtle::NoBarrier_Store(&is_started_, true);
// TODO(awong): it would be great if we didn't have to heap allocate the
// payload.
@@ -261,18 +269,18 @@ Status Proxy::SyncRequest(const string& method,
}
void Proxy::set_user_credentials(UserCredentials user_credentials) {
- CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
- << "It is illegal to call set_user_credentials() after request processing
has started";
+ DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+ << "illegal to call set_user_credentials() after request processing has
started";
conn_id_.set_user_credentials(std::move(user_credentials));
}
void Proxy::set_network_plane(string network_plane) {
- CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
- << "It is illegal to call set_network_plane() after request processing has
started";
+ DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+ << "illegal to call set_network_plane() after request processing has
started";
conn_id_.set_network_plane(std::move(network_plane));
}
-std::string Proxy::ToString() const {
+string Proxy::ToString() const {
return Substitute("$0@$1", service_name_, conn_id_.ToString());
}
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 64ef18861..815119ed8 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -72,6 +72,7 @@ static const int kDefaultLibEvFlags = ev::KQUEUE;
static const int kDefaultLibEvFlags = ev::AUTO;
#endif
+using std::function;
using std::string;
using std::shared_ptr;
using std::unique_ptr;
@@ -233,7 +234,7 @@ void ReactorThread::PollCompleteCb(struct ev_loop* loop)
noexcept {
}
void ReactorThread::Shutdown(Messenger::ShutdownMode mode) {
- CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+ DCHECK(reactor_->closing()) << "Should be called after setting closing_
flag";
VLOG(1) << name() << ": shutting down Reactor thread.";
WakeThread();
@@ -241,7 +242,11 @@ void ReactorThread::Shutdown(Messenger::ShutdownMode mode)
{
if (mode == Messenger::ShutdownMode::SYNC) {
// Join() will return a bad status if asked to join on the currently
// running thread.
- CHECK_OK(ThreadJoiner(thread_.get()).Join());
+ const auto s = ThreadJoiner(thread_.get()).Join();
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(DFATAL) << Substitute(
+ "$0: failed to join $1", s.ToString(), thread_->ToString());
+ }
}
}
@@ -707,7 +712,8 @@ void ReactorThread::DestroyConnection(Connection* conn,
// Unlink connection from lists.
if (conn->direction() == Connection::CLIENT) {
const auto range =
client_conns_.equal_range(conn->outbound_connection_id());
- CHECK(range.first != range.second) << "Couldn't find connection " <<
conn->ToString();
+ DCHECK(range.first != range.second)
+ << "couldn't find connection " << conn->ToString();
// The client_conns_ container is a multi-map.
for (auto it = range.first; it != range.second;) {
if (it->second.get() == conn) {
@@ -728,8 +734,7 @@ void ReactorThread::DestroyConnection(Connection* conn,
}
}
-DelayedTask::DelayedTask(std::function<void(const Status&)> func,
- MonoDelta when)
+DelayedTask::DelayedTask(function<void(const Status&)> func, MonoDelta when)
: func_(std::move(func)),
when_(when),
thread_(nullptr) {
@@ -821,7 +826,7 @@ bool Reactor::closing() const {
// Task to call an arbitrary function within the reactor thread.
class RunFunctionTask : public ReactorTask {
public:
- explicit RunFunctionTask(std::function<Status()> f)
+ explicit RunFunctionTask(function<Status()> f)
: function_(std::move(f)), latch_(1) {}
void Run(ReactorThread* /*reactor*/) override {
@@ -841,7 +846,7 @@ class RunFunctionTask : public ReactorTask {
}
private:
- const std::function<Status()> function_;
+ const function<Status()> function_;
Status status_;
CountDownLatch latch_;
};
@@ -850,7 +855,7 @@ Status Reactor::GetMetrics(ReactorMetrics* metrics) {
return RunOnReactorThread([&]() { return this->thread_.GetMetrics(metrics);
});
}
-Status Reactor::RunOnReactorThread(std::function<Status()> f) {
+Status Reactor::RunOnReactorThread(function<Status()> f) {
RunFunctionTask task(std::move(f));
ScheduleReactorTask(&task);
return task.Wait();
@@ -950,7 +955,7 @@ void Reactor::ScheduleReactorTask(ReactorTask* task) {
bool was_empty;
{
std::unique_lock<LockType> l(lock_);
- if (closing_) {
+ if (PREDICT_FALSE(closing_)) {
// We guarantee the reactor lock is not taken when calling Abort().
l.unlock();
task->Abort(ShutdownError(false));
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index 8a966c01f..8944a2b2d 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -173,12 +173,12 @@ class ReactorThread {
void RegisterTimeout(ev::timer* watcher);
// This may be called from another thread.
- const std::string &name() const;
+ const std::string& name() const;
MonoTime cur_time() const;
// This may be called from another thread.
- Reactor *reactor();
+ Reactor* reactor();
// Return true if this reactor thread is the thread currently
// running. Should be used in DCHECK assertions.
@@ -196,7 +196,7 @@ class ReactorThread {
// Collect metrics.
// Must be called from the reactor thread.
- Status GetMetrics(ReactorMetrics *metrics);
+ Status GetMetrics(ReactorMetrics* metrics);
private:
friend class AssignOutboundCallTask;
@@ -302,7 +302,7 @@ class ReactorThread {
// List of current connections coming into the server.
conn_list_t server_conns_;
- Reactor *reactor_;
+ Reactor* reactor_;
// If a connection has been idle for this much time, it is torn down.
const MonoDelta connection_keepalive_time_;
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index 1ce2da66d..d92c349b7 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -30,6 +30,7 @@
#include <limits>
#include <ostream>
#include <string>
+#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
@@ -61,20 +62,62 @@ TAG_FLAG(socket_inject_short_recvs, hidden);
TAG_FLAG(socket_inject_short_recvs, unsafe);
using std::string;
+using std::numeric_limits;
using strings::Substitute;
namespace kudu {
+namespace {
+
+Status ParseIpAddress(const string& addr_str, Sockaddr* result) {
+ DCHECK(!addr_str.empty());
+ Sockaddr bind_host;
+ const auto s = bind_host.ParseString(addr_str, 0);
+ if (PREDICT_FALSE(!s.ok() || bind_host.port() != 0)) {
+ if (!s.ok()) {
+ return Status::InvalidArgument(
+ Substitute("$0: invalid local IP address", addr_str), s.ToString());
+ }
+ return Status::InvalidArgument(
+ Substitute("$0: unexpected port with IP address", addr_str));
+ }
+
+ if (result) {
+ *result = std::move(bind_host);
+ }
+ return Status::OK();
+}
+
+bool ValidateLocalIpForOutboundSockets(
+ const char* flagname, const string& value) {
+ if (value.empty()) {
+ // The default value should pass the validation.
+ return true;
+ }
+
+ if (auto s = ParseIpAddress(value, nullptr); !s.ok()) {
+ LOG(ERROR) << Substitute("invalid local IP '$0' for --$1: $2",
+ value, flagname, s.ToString());
+ return false;
+ }
+ return true;
+}
+DEFINE_validator(local_ip_for_outbound_sockets,
+ &ValidateLocalIpForOutboundSockets);
+
+} // anonymous namespace
+
+
Socket::Socket()
- : fd_(-1) {
+ : fd_(-1) {
}
Socket::Socket(int fd)
- : fd_(fd) {
+ : fd_(fd) {
}
Socket::Socket(Socket&& other) noexcept
- : fd_(other.Release()) {
+ : fd_(other.Release()) {
}
void Socket::Reset(int fd) {
@@ -259,12 +302,11 @@ Status Socket::SetReusePort(bool flag) {
#endif
}
-Status Socket::BindAndListen(const Sockaddr &sockaddr,
+Status Socket::BindAndListen(const Sockaddr& sockaddr,
int listen_queue_size) {
RETURN_NOT_OK(SetReuseAddr(true));
RETURN_NOT_OK(Bind(sockaddr));
- RETURN_NOT_OK(Listen(listen_queue_size));
- return Status::OK();
+ return Listen(listen_queue_size);
}
Status Socket::Listen(int listen_queue_size) {
@@ -275,7 +317,7 @@ Status Socket::Listen(int listen_queue_size) {
return Status::OK();
}
-Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
+Status Socket::GetSocketAddress(Sockaddr* cur_addr) const {
struct sockaddr_storage ss;
socklen_t len = sizeof(ss);
DCHECK_GE(fd_, 0);
@@ -287,7 +329,7 @@ Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
return Status::OK();
}
-Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
+Status Socket::GetPeerAddress(Sockaddr* cur_addr) const {
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
DCHECK_GE(fd_, 0);
@@ -332,7 +374,7 @@ Status Socket::Bind(const Sockaddr& bind_addr) {
return Status::OK();
}
-Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
+Status Socket::Accept(Socket* new_conn, Sockaddr* remote, int flags) {
TRACE_EVENT0("net", "Socket::Accept");
struct sockaddr_storage addr;
socklen_t olen = sizeof(addr);
@@ -371,16 +413,11 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote,
int flags) {
Status Socket::BindForOutgoingConnection() {
Sockaddr bind_host;
- Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0);
- CHECK(s.ok() && bind_host.port() == 0)
- << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
- << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString();
-
- RETURN_NOT_OK(Bind(bind_host));
- return Status::OK();
+ RETURN_NOT_OK(ParseIpAddress(FLAGS_local_ip_for_outbound_sockets,
&bind_host));
+ return Bind(bind_host);
}
-Status Socket::Connect(const Sockaddr &remote) {
+Status Socket::Connect(const Sockaddr& remote) {
TRACE_EVENT1("net", "Socket::Connect",
"remote", remote.ToString());
if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
@@ -412,7 +449,7 @@ Status Socket::GetSockError() const {
return Status::OK();
}
-Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+Status Socket::Write(const uint8_t* buf, int32_t amt, int32_t* nwritten) {
if (amt <= 0) {
return Status::NetworkError(
StringPrintf("invalid send of %" PRId32 " bytes",
@@ -429,8 +466,9 @@ Status Socket::Write(const uint8_t *buf, int32_t amt,
int32_t *nwritten) {
return Status::OK();
}
-Status Socket::Writev(const struct ::iovec *iov, int iov_len,
- int64_t *nwritten) {
+Status Socket::Writev(const struct ::iovec* iov,
+ int iov_len,
+ int64_t* nwritten) {
if (PREDICT_FALSE(iov_len <= 0)) {
return Status::NetworkError(
StringPrintf("writev: invalid io vector length of %d",
@@ -441,7 +479,7 @@ Status Socket::Writev(const struct ::iovec *iov, int
iov_len,
struct msghdr msg;
memset(&msg, 0, sizeof(struct msghdr));
- msg.msg_iov = const_cast<iovec *>(iov);
+ msg.msg_iov = const_cast<iovec*>(iov);
msg.msg_iovlen = iov_len;
ssize_t res;
RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL));
@@ -455,9 +493,9 @@ Status Socket::Writev(const struct ::iovec *iov, int
iov_len,
}
// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
-Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t
*nwritten,
+Status Socket::BlockingWrite(const uint8_t* buf, size_t buflen, size_t*
nwritten,
const MonoTime& deadline) {
- DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes >
INT32_MAX not supported";
+ DCHECK_LE(buflen, numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not
supported";
DCHECK(nwritten);
size_t tot_written = 0;
@@ -492,15 +530,15 @@ Status Socket::BlockingWrite(const uint8_t *buf, size_t
buflen, size_t *nwritten
}
}
- if (tot_written < buflen) {
+ if (PREDICT_FALSE(tot_written < buflen)) {
return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
}
return Status::OK();
}
-Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
- if (amt <= 0) {
+Status Socket::Recv(uint8_t* buf, int32_t amt, int32_t* nread) {
+ if (PREDICT_FALSE(amt <= 0)) {
return Status::NetworkError(
StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
}
@@ -535,8 +573,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t
*nread) {
// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
// One place where we deviate: we consider EOF a failure if < amt bytes are
read.
-Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const
MonoTime& deadline) {
- DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX
not supported";
+Status Socket::BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const
MonoTime& deadline) {
+ DCHECK_LE(amt, numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not
supported";
DCHECK(nread);
size_t tot_read = 0;
while (tot_read < amt) {
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 830a389e9..2ec3b071c 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_UTIL_NET_SOCKET_H
-#define KUDU_UTIL_NET_SOCKET_H
+#pragma once
#include <cstddef>
#include <cstdint>
@@ -94,18 +93,18 @@ class Socket {
// 1) SetReuseAddr(true)
// 2) Bind()
// 3) Listen()
- Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size);
+ Status BindAndListen(const Sockaddr& sockaddr, int listen_queue_size);
// Start listening for new connections, with the given backlog size.
// Requires that the socket has already been bound using Bind().
Status Listen(int listen_queue_size);
// Call getsockname to get the address of this socket.
- Status GetSocketAddress(Sockaddr *cur_addr) const;
+ Status GetSocketAddress(Sockaddr* cur_addr) const;
// Call getpeername to get the address of the connected peer.
// It is virtual so that tests can override.
- virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
+ virtual Status GetPeerAddress(Sockaddr* cur_addr) const;
// Return true if this socket is determined to be a loopback connection
// (i.e. the local and remote peer share an IP address).
@@ -119,10 +118,10 @@ class Socket {
Status Bind(const Sockaddr& bind_addr);
// Call accept(2) to get a new connection.
- Status Accept(Socket *new_conn, Sockaddr *remote, int flags);
+ Status Accept(Socket* new_conn, Sockaddr* remote, int flags);
// start connecting this socket to a remote address.
- Status Connect(const Sockaddr &remote);
+ Status Connect(const Sockaddr& remote);
// get the error status using getsockopt(2)
Status GetSockError() const;
@@ -130,30 +129,30 @@ class Socket {
// Write up to 'amt' bytes from 'buf' to the socket. The number of bytes
// actually written will be stored in 'nwritten'. If an error is returned,
// the value of 'nwritten' is undefined.
- virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
+ virtual Status Write(const uint8_t* buf, int32_t amt, int32_t* nwritten);
// Vectorized Write.
// If there is an error, that error needs to be resolved before calling
again.
// If there was no error, but not all the bytes were written, the unwritten
// bytes must be retried. See writev(2) for more information.
- virtual Status Writev(const struct ::iovec *iov, int iov_len, int64_t
*nwritten);
+ virtual Status Writev(const struct ::iovec* iov, int iov_len, int64_t*
nwritten);
// Blocking Write call, returns IOError unless full buffer is sent.
// Underlying Socket expected to be in blocking mode. Fails if any Write()
sends 0 bytes.
// Returns OK if buflen bytes were sent, otherwise IOError.
// Upon return, nwritten will contain the number of bytes actually written.
// See also writen() from Stevens (2004) or Kerrisk (2010)
- Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+ Status BlockingWrite(const uint8_t* buf, size_t buflen, size_t* nwritten,
const MonoTime& deadline);
- virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread);
+ virtual Status Recv(uint8_t* buf, int32_t amt, int32_t* nread);
// Blocking Recv call, returns IOError unless requested amt bytes are read.
// Underlying Socket expected to be in blocking mode. Fails if any Recv()
reads 0 bytes.
// Returns OK if amt bytes were read, otherwise IOError.
// Upon return, nread will contain the number of bytes actually read.
// See also readn() from Stevens (2004) or Kerrisk (2010)
- Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime&
deadline);
+ Status BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const MonoTime&
deadline);
// Enable TCP keepalive for the underlying socket. A TCP keepalive probe
will be sent
// to the remote end after the connection has been idle for 'idle_time_s'
seconds.
@@ -183,5 +182,3 @@ class Socket {
};
} // namespace kudu
-
-#endif