This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fbde237 Add basic support for UNIX domain sockets
fbde237 is described below
commit fbde23718211f9912067d33fd09b4177263263a1
Author: Todd Lipcon <[email protected]>
AuthorDate: Thu Apr 9 14:04:20 2020 -0700
Add basic support for UNIX domain sockets
This adds support to Sockaddr to represent UNIX domain sockets, and
fixes some code in the RPC stack and Socket classes that previously
assumed sockaddr_in.
This updates rpc-test to run all of the cases over both TCP and UNIX
sockets. It also adds a simple performance test of running RPC over a
UNIX domain socket.
The performance difference is small but measurable. The test has a lot
of variability depending on whether the reactor threads, test thread,
and RPC worker thread get scheduled to the same core, different
hyperthreads of the same core, same vs different NUMA nodes, etc. To
eliminate that variability I ran the performance test using 'taskset -c
1' to pin to a single CPU:
I0409 15:15:50.290123 42405 rpc-test.cc:1529] Connecting to 0.0.0.0:33635
I0409 15:15:50.646266 42405 rpc-test.cc:1543] Sending 1024MB via tcp
socket: real 0.355s user 0.110s sys 0.245s
I0409 15:15:51.048060 42405 rpc-test.cc:1543] Sending 1024MB via tcp
socket: real 0.402s user 0.106s sys 0.295s
I0409 15:15:51.455765 42405 rpc-test.cc:1543] Sending 1024MB via tcp
socket: real 0.408s user 0.120s sys 0.287s
I0409 15:15:51.861732 42405 rpc-test.cc:1543] Sending 1024MB via tcp
socket: real 0.406s user 0.112s sys 0.293s
I0409 15:15:52.268314 42405 rpc-test.cc:1543] Sending 1024MB via tcp
socket: real 0.407s user 0.119s sys 0.287s
[ OK ] Parameters/TestRpc.TestPerformanceBySocketType/NoSSL_TCP (1987
ms)
[ RUN ] Parameters/TestRpc.TestPerformanceBySocketType/NoSSL_UnixSocket
I0409 15:15:52.271142 42405 rpc-test.cc:1529] Connecting to
unix:@kudu-test-42405
I0409 15:15:52.605296 42405 rpc-test.cc:1543] Sending 1024MB via unix
socket: real 0.333s user 0.098s sys 0.235s
I0409 15:15:52.934170 42405 rpc-test.cc:1543] Sending 1024MB via unix
socket: real 0.329s user 0.107s sys 0.222s
I0409 15:15:53.264912 42405 rpc-test.cc:1543] Sending 1024MB via unix
socket: real 0.331s user 0.115s sys 0.216s
I0409 15:15:53.592478 42405 rpc-test.cc:1543] Sending 1024MB via unix
socket: real 0.328s user 0.115s sys 0.212s
I0409 15:15:53.918494 42405 rpc-test.cc:1543] Sending 1024MB via unix
socket: real 0.326s user 0.107s sys 0.219s
The results show about a 1.2x throughput improvement using unix sockets.
Another benefit we can make use of in the future is the ability to pass file
descriptors over sockets: this would enable us to do true "zero-copy"
between
the server and client, which should have more significant benefits than seen
here.
Additionally, this patch will probably help get us closer to IPv6 support as
well, since it gets rid of some sockaddr_in-specific code paths in the RPC
stack.
Change-Id: I40ce3e4e1b98f806f0c29d2fbd88789657218c4b
Reviewed-on: http://gerrit.cloudera.org:8080/15691
Reviewed-by: Bankim Bhavsar <[email protected]>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
---
src/kudu/hms/hms_client-test.cc | 6 +-
src/kudu/mini-cluster/mini_cluster.cc | 2 +-
src/kudu/rpc/CMakeLists.txt | 2 +-
src/kudu/rpc/acceptor_pool.cc | 14 +-
src/kudu/rpc/connection.cc | 2 +-
src/kudu/rpc/connection_id.cc | 2 +-
src/kudu/rpc/messenger.cc | 2 +-
src/kudu/rpc/mt-rpc-test.cc | 2 +-
src/kudu/rpc/negotiation-test.cc | 16 +-
src/kudu/rpc/reactor.cc | 12 +-
src/kudu/rpc/reactor.h | 2 +-
src/kudu/rpc/rpc-test-base.h | 8 +-
src/kudu/rpc/rpc-test.cc | 412 +++++++++++++++++++---------------
src/kudu/rpc/server_negotiation.cc | 2 +
src/kudu/rpc/server_negotiation.h | 2 +-
src/kudu/security/tls_socket-test.cc | 4 +-
src/kudu/security/tls_socket.cc | 17 +-
src/kudu/security/tls_socket.h | 2 +
src/kudu/util/net/net_util-test.cc | 36 ++-
src/kudu/util/net/net_util.cc | 2 +-
src/kudu/util/net/sockaddr.cc | 98 +++++++-
src/kudu/util/net/sockaddr.h | 43 +++-
src/kudu/util/net/socket-test.cc | 111 +++++++--
src/kudu/util/net/socket.cc | 35 +--
src/kudu/util/net/socket.h | 4 +-
25 files changed, 575 insertions(+), 263 deletions(-)
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index b2555bf..e5e0044 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -408,7 +408,7 @@ TEST_F(HmsClientTest, TestHmsConnect) {
// Listening, but not accepting socket.
Sockaddr listening;
Socket listening_socket;
- ASSERT_OK(listening_socket.Init(0));
+ ASSERT_OK(listening_socket.Init(loopback.family(), 0));
ASSERT_OK(listening_socket.BindAndListen(loopback, 1));
listening_socket.GetSocketAddress(&listening);
ASSERT_TRUE(start_client(listening).IsTimedOut());
@@ -416,7 +416,7 @@ TEST_F(HmsClientTest, TestHmsConnect) {
// Bound, but not listening socket.
Sockaddr bound;
Socket bound_socket;
- ASSERT_OK(bound_socket.Init(0));
+ ASSERT_OK(bound_socket.Init(loopback.family(), 0));
ASSERT_OK(bound_socket.Bind(loopback));
bound_socket.GetSocketAddress(&bound);
ASSERT_TRUE(start_client(bound).IsNetworkError());
@@ -424,7 +424,7 @@ TEST_F(HmsClientTest, TestHmsConnect) {
// Unbound socket.
Sockaddr unbound;
Socket unbound_socket;
- ASSERT_OK(unbound_socket.Init(0));
+ ASSERT_OK(unbound_socket.Init(loopback.family(), 0));
ASSERT_OK(unbound_socket.Bind(loopback));
unbound_socket.GetSocketAddress(&unbound);
ASSERT_OK(unbound_socket.Close());
diff --git a/src/kudu/mini-cluster/mini_cluster.cc
b/src/kudu/mini-cluster/mini_cluster.cc
index ffc97eb..3a48cda 100644
--- a/src/kudu/mini-cluster/mini_cluster.cc
+++ b/src/kudu/mini-cluster/mini_cluster.cc
@@ -66,7 +66,7 @@ Status MiniCluster::ReserveDaemonSocket(DaemonType type,
RETURN_NOT_OK(sock_addr.ParseString(ip, 0));
unique_ptr<Socket> sock(new Socket());
- RETURN_NOT_OK(sock->Init(0));
+ RETURN_NOT_OK(sock->Init(sock_addr.family(), 0));
RETURN_NOT_OK(sock->SetReuseAddr(true));
RETURN_NOT_OK(sock->SetReusePort(true));
RETURN_NOT_OK(sock->Bind(sock_addr));
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 1190968..273842d 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -132,6 +132,6 @@ ADD_KUDU_TEST(periodic-test)
ADD_KUDU_TEST(reactor-test)
ADD_KUDU_TEST(request_tracker-test)
ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
-ADD_KUDU_TEST(rpc-test)
+ADD_KUDU_TEST(rpc-test NUM_SHARDS 8)
ADD_KUDU_TEST(rpc_stub-test)
ADD_KUDU_TEST(service_queue-test RUN_SERIAL true)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index f1f2270..f466c59 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -160,12 +160,14 @@ void AcceptorPool::RunThread() {
<< THROTTLE_MSG;
continue;
}
- s = new_sock.SetNoDelay(true);
- if (!s.ok()) {
- KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " <<
remote.ToString()
- << " failed to set TCP_NODELAY on a newly accepted socket: "
- << s.ToString() << THROTTLE_MSG;
- continue;
+ if (remote.is_ip()) {
+ s = new_sock.SetNoDelay(true);
+ if (!s.ok()) {
+ KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " <<
remote.ToString()
+ << " failed to set TCP_NODELAY on a
newly accepted socket: "
+ << s.ToString() << THROTTLE_MSG;
+ continue;
+ }
}
rpc_connections_accepted_->Increment();
messenger_->RegisterInboundSocket(&new_sock, remote);
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index f45f1d3..0350fad 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -907,7 +907,7 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB&
req,
LOG(FATAL);
}
#ifdef __linux__
- if (negotiation_complete_) {
+ if (negotiation_complete_ && remote_.is_ip()) {
// TODO(todd): it's a little strange to not set socket level stats during
// negotiation, but we don't have access to the socket here until
negotiation
// is complete.
diff --git a/src/kudu/rpc/connection_id.cc b/src/kudu/rpc/connection_id.cc
index ab98715..9728f01 100644
--- a/src/kudu/rpc/connection_id.cc
+++ b/src/kudu/rpc/connection_id.cc
@@ -52,7 +52,7 @@ void ConnectionId::set_network_plane(string network_plane) {
string ConnectionId::ToString() const {
string remote;
- if (hostname_ != remote_.host()) {
+ if (remote_.is_ip() && hostname_ != remote_.host()) {
remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
} else {
remote = remote_.ToString();
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 5df8fb5..c000303 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -307,7 +307,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr
&accept_addr,
}
Socket sock;
- RETURN_NOT_OK(sock.Init(0));
+ RETURN_NOT_OK(sock.Init(accept_addr.family(), 0));
RETURN_NOT_OK(sock.SetReuseAddr(true));
if (reuseport_) {
RETURN_NOT_OK(sock.SetReusePort(true));
diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index 1207333..d16c761 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -266,7 +266,7 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
static void HammerServerWithTCPConns(const Sockaddr& addr) {
while (true) {
Socket socket;
- CHECK_OK(socket.Init(0));
+ CHECK_OK(socket.Init(addr.family(), 0));
Status s;
LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
s = socket.Connect(addr);
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
index 99da1ca..4498e4a 100644
--- a/src/kudu/rpc/negotiation-test.cc
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -232,13 +232,13 @@ TEST_P(TestNegotiation, TestNegotiation) {
// Create the listening socket, client socket, and server socket.
Socket listening_socket;
- ASSERT_OK(listening_socket.Init(0));
- ASSERT_OK(listening_socket.BindAndListen(Sockaddr::Wildcard(), 1));
- Sockaddr server_addr;
+ Sockaddr server_addr = Sockaddr::Wildcard();
+ ASSERT_OK(listening_socket.Init(server_addr.family(), 0));
+ ASSERT_OK(listening_socket.BindAndListen(server_addr, 1));
ASSERT_OK(listening_socket.GetSocketAddress(&server_addr));
unique_ptr<Socket> client_socket(new Socket());
- ASSERT_OK(client_socket->Init(0));
+ ASSERT_OK(client_socket->Init(server_addr.family(), 0));
client_socket->Connect(server_addr);
unique_ptr<Socket> server_socket(desc.use_test_socket ?
@@ -1004,14 +1004,14 @@ static void RunAcceptingDelegator(Socket* acceptor,
static void RunNegotiationTest(const SocketCallable& server_runner,
const SocketCallable& client_runner) {
Socket server_sock;
- CHECK_OK(server_sock.Init(0));
- ASSERT_OK(server_sock.BindAndListen(Sockaddr::Wildcard(), 1));
- Sockaddr server_bind_addr;
+ Sockaddr server_bind_addr = Sockaddr::Wildcard();
+ CHECK_OK(server_sock.Init(server_bind_addr.family(), 0));
+ ASSERT_OK(server_sock.BindAndListen(server_bind_addr, 1));
ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
thread server(RunAcceptingDelegator, &server_sock, server_runner);
unique_ptr<Socket> client_sock(new Socket());
- CHECK_OK(client_sock->Init(0));
+ CHECK_OK(client_sock->Init(server_bind_addr.family(), 0));
ASSERT_OK(client_sock->Connect(server_bind_addr));
thread client(client_runner, std::move(client_sock));
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 3b48653..771b9f8 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -19,6 +19,7 @@
#include <openssl/crypto.h>
#include <openssl/err.h> // IWYU pragma: keep
+#include <sys/socket.h>
#include <cerrno>
#include <functional>
@@ -569,7 +570,7 @@ Status ReactorThread::FindOrStartConnection(const
ConnectionId& conn_id,
// Create a new socket and start connecting to the remote.
Socket sock;
- RETURN_NOT_OK(CreateClientSocket(&sock));
+ RETURN_NOT_OK(CreateClientSocket(conn_id.remote().family(), &sock));
RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
unique_ptr<Socket> new_socket(new Socket(sock.Release()));
@@ -634,7 +635,8 @@ void ReactorThread::CompleteConnectionNegotiation(
return;
}
- if (FLAGS_tcp_keepalive_probe_period_s > 0) {
+ if (conn->remote().is_ip() &&
+ FLAGS_tcp_keepalive_probe_period_s > 0) {
// Try spreading out the idle poll period to avoid thundering herd in case
connections
// are all created at the same time (e.g. after a cluster is restarted).
Status keepalive_status = conn->SetTcpKeepAlive(
@@ -652,9 +654,9 @@ void ReactorThread::CompleteConnectionNegotiation(
conn->EpollRegister(loop_);
}
-Status ReactorThread::CreateClientSocket(Socket* sock) {
- Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
- if (ret.ok()) {
+Status ReactorThread::CreateClientSocket(int family, Socket* sock) {
+ Status ret = sock->Init(family, Socket::FLAG_NONBLOCKING);
+ if (ret.ok() && family == AF_INET) {
ret = sock->SetNoDelay(true);
}
LOG_IF(WARNING, !ret.ok())
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index ae279fb..8a966c0 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -250,7 +250,7 @@ class ReactorThread {
void ScanIdleConnections();
// Create a new client socket (non-blocking, NODELAY)
- static Status CreateClientSocket(Socket* sock);
+ static Status CreateClientSocket(int family, Socket* sock);
// Initiate a new connection on the given socket.
static Status StartConnect(Socket* sock, const Sockaddr& remote);
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 1710e9c..5d19137 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -606,7 +606,7 @@ class RpcTestBase : public KuduTest {
static Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) {
Sockaddr bind_addr = Sockaddr::Wildcard();
bind_addr.set_port(0);
- RETURN_NOT_OK(listen_sock->Init(0));
+ RETURN_NOT_OK(listen_sock->Init(bind_addr.family(), 0));
RETURN_NOT_OK(listen_sock->BindAndListen(bind_addr, 1));
RETURN_NOT_OK(listen_sock->GetSocketAddress(listen_addr));
LOG(INFO) << "Bound to: " << listen_addr->ToString();
@@ -631,6 +631,10 @@ class RpcTestBase : public KuduTest {
const std::string& rpc_ca_certificate_file = "",
const std::string& rpc_private_key_password_cmd =
"",
const std::shared_ptr<Messenger>& messenger =
nullptr) {
+ // Default to binding on wildcard unless specified as an in-out parameter.
+ if (!server_addr->is_initialized()) {
+ *server_addr = Sockaddr::Wildcard();
+ }
if (!messenger) {
RETURN_NOT_OK(CreateMessenger(
"TestServer", &server_messenger_, n_server_reactor_threads_,
enable_ssl,
@@ -640,7 +644,7 @@ class RpcTestBase : public KuduTest {
server_messenger_ = messenger;
}
std::shared_ptr<AcceptorPool> pool;
- RETURN_NOT_OK(server_messenger_->AddAcceptorPool(Sockaddr::Wildcard(),
&pool));
+ RETURN_NOT_OK(server_messenger_->AddAcceptorPool(*server_addr, &pool));
RETURN_NOT_OK(pool->Start(2));
*server_addr = pool->bind_address();
mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker");
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 92c9671..7415877 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include <unistd.h>
+
#include <cerrno>
#include <cstdint>
#include <cstdlib>
@@ -65,6 +67,7 @@
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -77,6 +80,7 @@ DECLARE_int32(tcp_keepalive_probe_period_s);
DECLARE_int32(tcp_keepalive_retry_period_s);
DECLARE_int32(tcp_keepalive_retry_count);
+using std::tuple;
using std::shared_ptr;
using std::string;
using std::thread;
@@ -85,18 +89,58 @@ using std::unordered_map;
using std::vector;
namespace kudu {
+
+using strings::Substitute;
+
namespace rpc {
-class TestRpc : public RpcTestBase, public ::testing::WithParamInterface<bool>
{
+// RPC proxies require a hostname to be passed. In this test we're just
connecting to
+// the wildcard, so we'll hard-code this hostname instead.
+static const char* const kRemoteHostName = "localhost";
+
+class TestRpc : public RpcTestBase, public
::testing::WithParamInterface<tuple<bool,bool>> {
+ protected:
+ TestRpc() {
+ }
+
+ bool enable_ssl() const {
+ return std::get<0>(GetParam());
+ }
+ bool use_unix_socket() const {
+ return std::get<1>(GetParam());
+ }
+ Sockaddr bind_addr() const {
+ if (use_unix_socket()) {
+ Sockaddr addr;
+ string path = Substitute("@kudu-test-$0", getpid());
+ CHECK_OK(addr.ParseUnixDomainPath(path));
+ return addr;
+ }
+ return Sockaddr::Wildcard();
+ }
+ static string expected_remote_str(const Sockaddr& bound_addr) {
+ if (bound_addr.is_ip()) {
+ return Substitute("$0 ($1)", bound_addr.ToString(), kRemoteHostName);
+ }
+ return bound_addr.ToString();
+ }
};
-// This is used to run all parameterized tests with and without SSL.
-INSTANTIATE_TEST_CASE_P(OptionalSSL, TestRpc, testing::Values(false, true));
+// This is used to run all parameterized tests with and without SSL, on Unix
sockets
+// and TCP.
+INSTANTIATE_TEST_CASE_P(Parameters, TestRpc,
+ testing::Combine(testing::Values(false, true),
+ testing::Values(false, true)),
+ [](const testing::TestParamInfo<tuple<bool, bool>>&
info) {
+ return Substitute("$0_$1",
+ std::get<0>(info.param) ? "SSL" :
"NoSSL",
+ std::get<1>(info.param) ?
"UnixSocket" : "TCP");
+ });
TEST_P(TestRpc, TestMessengerCreateDestroy) {
shared_ptr<Messenger> messenger;
- ASSERT_OK(CreateMessenger("TestCreateDestroy", &messenger, 1, GetParam()));
+ ASSERT_OK(CreateMessenger("TestCreateDestroy", &messenger, 1, enable_ssl()));
LOG(INFO) << "started messenger " << messenger->name();
messenger->Shutdown();
}
@@ -109,12 +153,15 @@ TEST_P(TestRpc, TestAcceptorPoolStartStop) {
int n_iters = AllowSlowTests() ? 100 : 5;
for (int i = 0; i < n_iters; i++) {
shared_ptr<Messenger> messenger;
- ASSERT_OK(CreateMessenger("TestAcceptorPoolStartStop", &messenger, 1,
GetParam()));
+ ASSERT_OK(CreateMessenger("TestAcceptorPoolStartStop", &messenger, 1,
enable_ssl()));
shared_ptr<AcceptorPool> pool;
- ASSERT_OK(messenger->AddAcceptorPool(Sockaddr::Wildcard(), &pool));
+ ASSERT_OK(messenger->AddAcceptorPool(bind_addr(), &pool));
Sockaddr bound_addr;
ASSERT_OK(pool->GetBoundAddress(&bound_addr));
- ASSERT_NE(0, bound_addr.port());
+ ASSERT_TRUE(bound_addr.is_initialized());
+ if (!use_unix_socket()) {
+ ASSERT_NE(0, bound_addr.port());
+ }
ASSERT_OK(pool->Start(2));
messenger->Shutdown();
}
@@ -130,7 +177,6 @@ TEST_F(TestRpc, TestConnHeaderValidation) {
// Regression test for KUDU-2041
TEST_P(TestRpc, TestNegotiationDeadlock) {
- bool enable_ssl = GetParam();
// The deadlock would manifest in cases where the number of concurrent
connection
// requests >= the number of threads. 1 thread and 1 cnxn to ourself is just
the easiest
@@ -140,15 +186,15 @@ TEST_P(TestRpc, TestNegotiationDeadlock) {
mb.set_min_negotiation_threads(1)
.set_max_negotiation_threads(1)
.set_metric_entity(metric_entity_);
- if (enable_ssl) mb.enable_inbound_tls();
+ if (enable_ssl()) mb.enable_inbound_tls();
shared_ptr<Messenger> messenger;
CHECK_OK(mb.Build(&messenger));
- Sockaddr server_addr;
- ASSERT_OK(StartTestServerWithCustomMessenger(&server_addr, messenger,
enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithCustomMessenger(&server_addr, messenger,
enable_ssl()));
- Proxy p(messenger, server_addr, server_addr.host(),
+ Proxy p(messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
@@ -156,19 +202,18 @@ TEST_P(TestRpc, TestNegotiationDeadlock) {
// Test making successful RPC calls.
TEST_P(TestRpc, TestCall) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
- ASSERT_STR_CONTAINS(p.ToString(),
strings::Substitute("kudu.rpc.GenericCalculatorService@"
- "{remote=$0,
user_credentials=",
-
server_addr.ToString()));
+ ASSERT_STR_CONTAINS(p.ToString(),
Substitute("kudu.rpc.GenericCalculatorService@"
+ "{remote=$0, user_credentials=",
+
expected_remote_str(server_addr)));
for (int i = 0; i < 10; i++) {
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
@@ -177,9 +222,8 @@ TEST_P(TestRpc, TestCall) {
// Test for KUDU-2091 and KUDU-2220.
TEST_P(TestRpc, TestCallWithChainCertAndChainCA) {
- bool enable_ssl = GetParam();
// We're only interested in running this test with TLS enabled.
- if (!enable_ssl) return;
+ if (!enable_ssl()) return;
string rpc_certificate_file;
string rpc_private_key_file;
@@ -189,29 +233,28 @@ TEST_P(TestRpc, TestCallWithChainCertAndChainCA) {
&rpc_private_key_file,
&rpc_ca_certificate_file));
// Set up server.
- Sockaddr server_addr;
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
SCOPED_TRACE(strings::Substitute("Connecting to $0",
server_addr.ToString()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl,
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl(),
rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_STR_CONTAINS(p.ToString(),
strings::Substitute("kudu.rpc.GenericCalculatorService@"
"{remote=$0,
user_credentials=",
-
server_addr.ToString()));
+
expected_remote_str(server_addr)));
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
// Test for KUDU-2041.
TEST_P(TestRpc, TestCallWithChainCertAndRootCA) {
- bool enable_ssl = GetParam();
// We're only interested in running this test with TLS enabled.
- if (!enable_ssl) return;
+ if (!enable_ssl()) return;
string rpc_certificate_file;
string rpc_private_key_file;
@@ -221,20 +264,20 @@ TEST_P(TestRpc, TestCallWithChainCertAndRootCA) {
&rpc_private_key_file,
&rpc_ca_certificate_file));
// Set up server.
- Sockaddr server_addr;
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
SCOPED_TRACE(strings::Substitute("Connecting to $0",
server_addr.ToString()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl,
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl(),
rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_STR_CONTAINS(p.ToString(),
strings::Substitute("kudu.rpc.GenericCalculatorService@"
"{remote=$0,
user_credentials=",
-
server_addr.ToString()));
+
expected_remote_str(server_addr)));
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
@@ -242,9 +285,8 @@ TEST_P(TestRpc, TestCallWithChainCertAndRootCA) {
// Test making successful RPC calls while using a TLS certificate with a
password protected
// private key.
TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
- bool enable_ssl = GetParam();
// We're only interested in running this test with TLS enabled.
- if (!enable_ssl) return;
+ if (!enable_ssl()) return;
string rpc_certificate_file;
string rpc_private_key_file;
@@ -258,20 +300,20 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
rpc_ca_certificate_file = rpc_certificate_file;
rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
// Set up server.
- Sockaddr server_addr;
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
SCOPED_TRACE(strings::Substitute("Connecting to $0",
server_addr.ToString()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl,
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl(),
rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file,
rpc_private_key_password_cmd));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_STR_CONTAINS(p.ToString(),
strings::Substitute("kudu.rpc.GenericCalculatorService@"
"{remote=$0,
user_credentials=",
-
server_addr.ToString()));
+
expected_remote_str(server_addr)));
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
@@ -279,9 +321,8 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
// Test that using a TLS certificate with a password protected private key and
providing
// the wrong password for that private key, causes a server startup failure.
TEST_P(TestRpc, TestCallWithBadPasswordProtectedKey) {
- bool enable_ssl = GetParam();
// We're only interested in running this test with TLS enabled.
- if (!enable_ssl) return;
+ if (!enable_ssl()) return;
string rpc_certificate_file;
string rpc_private_key_file;
@@ -297,8 +338,8 @@ TEST_P(TestRpc, TestCallWithBadPasswordProtectedKey) {
rpc_ca_certificate_file = rpc_certificate_file;
rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
// Verify that the server fails to start up.
- Sockaddr server_addr;
- Status s = StartTestServer(&server_addr, enable_ssl, rpc_certificate_file,
rpc_private_key_file,
+ Sockaddr server_addr = bind_addr();
+ Status s = StartTestServer(&server_addr, enable_ssl(), rpc_certificate_file,
rpc_private_key_file,
rpc_ca_certificate_file, rpc_private_key_password_cmd);
ASSERT_TRUE(s.IsRuntimeError());
ASSERT_STR_CONTAINS(s.ToString(), "failed to load private key file");
@@ -307,7 +348,7 @@ TEST_P(TestRpc, TestCallWithBadPasswordProtectedKey) {
// Test that connecting to an invalid server properly throws an error.
TEST_P(TestRpc, TestCallToBadServer) {
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
Sockaddr addr = Sockaddr::Wildcard();
addr.set_port(0);
Proxy p(client_messenger, addr, addr.host(),
@@ -325,15 +366,14 @@ TEST_P(TestRpc, TestCallToBadServer) {
// Test that RPC calls can be failed with an error status on the server.
TEST_P(TestRpc, TestInvalidMethodCall) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Call the method which fails.
@@ -346,13 +386,12 @@ TEST_P(TestRpc, TestInvalidMethodCall) {
// is reasonable.
TEST_P(TestRpc, TestWrongService) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client with the wrong service name.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName");
// Call the method which fails.
@@ -384,12 +423,11 @@ TEST_P(TestRpc, TestHighFDs) {
}
// Set up server and client, and verify we can make a successful call.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
@@ -402,15 +440,14 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
keepalive_time_ms_ = 500;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
@@ -447,15 +484,14 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
keepalive_time_ms_ = -1;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
@@ -484,16 +520,15 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
// Test that the metrics on a per connection level work accurately.
TEST_P(TestRpc, TestClientConnectionMetrics) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client with one reactor so that we can grab the metrics from just
// that reactor.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Here we queue a bunch of calls to the server and test that the sender's
@@ -539,15 +574,17 @@ TEST_P(TestRpc, TestClientConnectionMetrics) {
ASSERT_GT(conn.outbound_queue_size(), 0);
#ifdef __linux__
- // Test that the socket statistics are present. We only assert on those
that
- // we know to be present on all kernel versions.
- ASSERT_TRUE(conn.has_socket_stats());
- ASSERT_GT(conn.socket_stats().rtt(), 0);
- ASSERT_GT(conn.socket_stats().rttvar(), 0);
- ASSERT_GT(conn.socket_stats().snd_cwnd(), 0);
- ASSERT_GT(conn.socket_stats().send_bytes_per_sec(), 0);
- ASSERT_TRUE(conn.socket_stats().has_send_queue_bytes());
- ASSERT_TRUE(conn.socket_stats().has_receive_queue_bytes());
+ if (!use_unix_socket()) {
+ // Test that the socket statistics are present. We only assert on those
that
+ // we know to be present on all kernel versions.
+ ASSERT_TRUE(conn.has_socket_stats());
+ ASSERT_GT(conn.socket_stats().rtt(), 0);
+ ASSERT_GT(conn.socket_stats().rttvar(), 0);
+ ASSERT_GT(conn.socket_stats().snd_cwnd(), 0);
+ ASSERT_GT(conn.socket_stats().send_bytes_per_sec(), 0);
+ ASSERT_TRUE(conn.socket_stats().has_send_queue_bytes());
+ ASSERT_TRUE(conn.socket_stats().has_receive_queue_bytes());
+ }
#endif
// Unblock all of the calls and wait for them to finish.
@@ -573,15 +610,14 @@ TEST_P(TestRpc, TestReopenOutboundConnections) {
n_server_reactor_threads_ = 1;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Verify the initial counters.
@@ -616,15 +652,14 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
n_server_reactor_threads_ = 1;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Verify the initial counters.
@@ -689,17 +724,16 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) {
keepalive_time_ms_ = -1;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up clients with default and non-default network planes.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p1(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p1(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
- Proxy p2(client_messenger, server_addr, server_addr.host(),
+ Proxy p2(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
p2.set_network_plane("control-channel");
@@ -757,14 +791,13 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
keepalive_time_ms_ = 1000;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Make a call which sleeps longer than the keepalive.
@@ -781,17 +814,16 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
// and verifies that the call succeeds (i.e. the connection is not closed).
TEST_P(TestRpc, TestTCPKeepalive) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
FLAGS_tcp_keepalive_probe_period_s = 1;
FLAGS_tcp_keepalive_retry_period_s = 1;
FLAGS_tcp_keepalive_retry_count = 1;
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Make a call which sleeps for longer than TCP keepalive probe period,
@@ -808,14 +840,13 @@ TEST_P(TestRpc, TestTCPKeepalive) {
// Test that the RpcSidecar transfers the expected messages.
TEST_P(TestRpc, TestRpcSidecar) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Test a zero-length sidecar
@@ -838,14 +869,13 @@ TEST_P(TestRpc, TestRpcSidecar) {
// call.
TEST_P(TestRpc, TestMaxSmallSidecars) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
Random rng(GetRandomSeed32());
@@ -907,14 +937,14 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
FLAGS_rpc_max_message_size = rpc_max_message_size_val;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
RpcController controller;
@@ -933,6 +963,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
ASSERT_STR_MATCHES(status.ToString(),
// Linux
"Connection reset by peer"
+ // Linux domain socket
+ "|Broken pipe"
// While reading from socket.
"|recv got EOF from"
// Linux, SSL enabled
@@ -946,12 +978,11 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
// Test that timeouts are properly handled.
TEST_P(TestRpc, TestCallTimeout) {
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Test a very short timeout - we expect this will time out while the
@@ -976,12 +1007,11 @@ TEST_P(TestRpc, TestCallTimeout) {
// was assigned the timeout of the first call on that connection. So, if the
first
// call had a short timeout, the later call would also inherit the timed-out
negotiation.
TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) {
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
FLAGS_rpc_negotiation_inject_delay_ms = 500;
@@ -1013,7 +1043,7 @@ static void AcceptAndReadForever(Socket* listen_sock) {
// Ensures that the client gets a reasonable status code in this case.
TEST_F(TestRpc, TestNegotiationTimeout) {
// Set up a simple socket server which accepts a connection.
- Sockaddr server_addr;
+ Sockaddr server_addr = Sockaddr::Wildcard();
Socket listen_sock;
ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
@@ -1024,7 +1054,7 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
// Set up client.
shared_ptr<Messenger> client_messenger;
ASSERT_OK(CreateMessenger("Client", &client_messenger));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
bool is_negotiation_error = false;
@@ -1035,9 +1065,9 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
// Test that client calls get failed properly when the server they're
connected to
// shuts down.
-TEST_F(TestRpc, TestServerShutsDown) {
+TEST_P(TestRpc, TestServerShutsDown) {
// Set up a simple socket server which accepts a connection.
- Sockaddr server_addr;
+ Sockaddr server_addr = bind_addr();
Socket listen_sock;
ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
@@ -1045,7 +1075,7 @@ TEST_F(TestRpc, TestServerShutsDown) {
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
ASSERT_OK(CreateMessenger("Client", &client_messenger));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Send a call.
@@ -1128,14 +1158,13 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
const uint64_t sleep_micros = 20 * 1000;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
RpcController controller;
@@ -1174,7 +1203,7 @@ static void
DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
TEST_P(TestRpc, TestRpcCallbackDestroysMessenger) {
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
Sockaddr bad_addr = Sockaddr::Wildcard();
CountDownLatch latch(1);
@@ -1200,14 +1229,13 @@ TEST_P(TestRpc, TestRpcContextClientDeadline) {
const uint64_t sleep_micros = 20 * 1000;
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
SleepRequestPB req;
@@ -1228,14 +1256,13 @@ TEST_P(TestRpc, TestRpcContextClientDeadline) {
// will make the server reject the call.
TEST_P(TestRpc, TestApplicationFeatureFlag) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
{ // Supported flag
@@ -1271,14 +1298,13 @@ TEST_P(TestRpc,
TestApplicationFeatureFlagUnsupportedServer) {
kSupportedServerRpcFeatureFlags = {};
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
// Set up client.
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
{ // Required flag
@@ -1307,15 +1333,14 @@ TEST_P(TestRpc,
TestApplicationFeatureFlagUnsupportedServer) {
TEST_P(TestRpc, TestCancellation) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
int timeout_ms = 10;
@@ -1376,15 +1401,14 @@ static void SleepCallback(uint8_t* payload,
CountDownLatch* latch) {
// Test to verify that sidecars aren't corrupted when cancelling an async RPC.
TEST_P(TestRpc, TestCancellationAsync) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
RpcController controller;
@@ -1469,15 +1493,14 @@ static void SendAndCancelRpcs(Proxy* p, const Slice&
slice) {
// same client to the same server.
TEST_P(TestRpc, TestCancellationMultiThreads) {
// Set up server.
- Sockaddr server_addr;
- bool enable_ssl = GetParam();
- ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
- ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
- Proxy p(client_messenger, server_addr, server_addr.host(),
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
GenericCalculatorService::static_service_name());
// Buffer used for sidecars by SendAndCancelRpcs().
@@ -1498,5 +1521,42 @@ TEST_P(TestRpc, TestCancellationMultiThreads) {
client_messenger->Shutdown();
}
+
+// Test performance of Ipv4 vs unix sockets.
+TEST_P(TestRpc, TestPerformanceBySocketType) {
+ static constexpr int kNumMb = 1024;
+ static constexpr int kMbPerRpc = 4;
+ static_assert(kNumMb % kMbPerRpc == 0, "total should be a multiple of
per-RPC");
+
+ const vector<string> sidecars = { string(kMbPerRpc * 1024 * 1024, 'x') };
+
+ // Set up server.
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl()));
+
+ // Set up client.
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger;
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
+ GenericCalculatorService::static_service_name());
+
+ for (int i = 0; i < 5; i++) {
+ Stopwatch sw(Stopwatch::ALL_THREADS);
+ sw.start();
+ for (int i = 0; i < kNumMb / kMbPerRpc; i++) {
+ DoTestOutgoingSidecar(p, sidecars);
+ }
+ sw.stop();
+ LOG(INFO) << strings::Substitute(
+ "Sending $0MB via $1$2 socket: $3",
+ kNumMb,
+ enable_ssl() ? "ssl-enabled " : "",
+ use_unix_socket() ? "unix" : "tcp",
+ sw.elapsed().ToString());
+ }
+}
+
+
} // namespace rpc
} // namespace kudu
diff --git a/src/kudu/rpc/server_negotiation.cc
b/src/kudu/rpc/server_negotiation.cc
index c4babc3..303f0cf 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -18,6 +18,7 @@
#include "kudu/rpc/server_negotiation.h"
#include <sasl/sasl.h>
+#include <sys/socket.h>
#include <algorithm>
#include <cstdint>
@@ -984,6 +985,7 @@ int ServerNegotiation::PlainAuthCb(sasl_conn_t* /*conn*/,
}
bool ServerNegotiation::IsTrustedConnection(const Sockaddr& addr) {
+ if (addr.family() == AF_UNIX) return true;
static std::once_flag once;
std::call_once(once, [] {
g_trusted_subnets = new vector<Network>();
diff --git a/src/kudu/rpc/server_negotiation.h
b/src/kudu/rpc/server_negotiation.h
index 2582af1..7935569 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -213,7 +213,7 @@ class ServerNegotiation {
Status RecvConnectionContext(faststring* recv_buf) WARN_UNUSED_RESULT;
// Returns true if connection is from trusted subnets or local networks.
- bool IsTrustedConnection(const Sockaddr& addr);
+ static bool IsTrustedConnection(const Sockaddr& addr);
// The socket to the remote client.
std::unique_ptr<Socket> socket_;
diff --git a/src/kudu/security/tls_socket-test.cc
b/src/kudu/security/tls_socket-test.cc
index b39c25c..2db41d8 100644
--- a/src/kudu/security/tls_socket-test.cc
+++ b/src/kudu/security/tls_socket-test.cc
@@ -111,7 +111,7 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls,
const char* side) {
void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>*
sock) {
unique_ptr<Socket> client_sock(new Socket());
- ASSERT_OK(client_sock->Init(0));
+ ASSERT_OK(client_sock->Init(addr.family(), 0));
ASSERT_OK(client_sock->Connect(addr));
TlsHandshake client;
@@ -135,7 +135,7 @@ class EchoServer {
ASSERT_OK(server_tls_.Init());
ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
- ASSERT_OK(listener_.Init(0));
+ ASSERT_OK(listener_.Init(listen_addr_.family(), 0));
ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
diff --git a/src/kudu/security/tls_socket.cc b/src/kudu/security/tls_socket.cc
index 5ae6c74..8f84c3f 100644
--- a/src/kudu/security/tls_socket.cc
+++ b/src/kudu/security/tls_socket.cc
@@ -44,6 +44,15 @@ namespace security {
TlsSocket::TlsSocket(int fd, c_unique_ptr<SSL> ssl)
: Socket(fd),
ssl_(std::move(ssl)) {
+ use_cork_ = true;
+ if (fd >= 0) {
+ int dom;
+ socklen_t len = sizeof(dom);
+ if (getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &dom, &len) == 0 &&
+ dom == AF_UNIX) {
+ use_cork_ = false;
+ }
+ }
}
TlsSocket::~TlsSocket() {
@@ -87,7 +96,9 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int
iov_len, int64_t *nwritt
CHECK(ssl_);
*nwritten = 0;
// Allows packets to be aggresively be accumulated before sending.
- RETURN_NOT_OK(SetTcpCork(1));
+ if (use_cork_) {
+ RETURN_NOT_OK(SetTcpCork(1));
+ }
Status write_status = Status::OK();
for (int i = 0; i < iov_len; ++i) {
int32_t frame_size = iov[i].iov_len;
@@ -100,7 +111,9 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int
iov_len, int64_t *nwritt
*nwritten += bytes_written;
if (bytes_written < frame_size) break;
}
- RETURN_NOT_OK(SetTcpCork(0));
+ if (use_cork_) {
+ RETURN_NOT_OK(SetTcpCork(0));
+}
// If we did manage to write something, but not everything, due to a
temporary socket
// error, then we should still return an OK status indicating a successful
_partial_
// write.
diff --git a/src/kudu/security/tls_socket.h b/src/kudu/security/tls_socket.h
index bf5693d..c0e503b 100644
--- a/src/kudu/security/tls_socket.h
+++ b/src/kudu/security/tls_socket.h
@@ -57,6 +57,8 @@ class TlsSocket : public Socket {
// Owned SSL handle.
c_unique_ptr<SSL> ssl_;
+
+ bool use_cork_;
};
} // namespace security
diff --git a/src/kudu/util/net/net_util-test.cc
b/src/kudu/util/net/net_util-test.cc
index d0ffd8d..ae85220 100644
--- a/src/kudu/util/net/net_util-test.cc
+++ b/src/kudu/util/net/net_util-test.cc
@@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/util/net/net_util.h"
+
+#include <sys/socket.h>
+
#include <algorithm>
#include <cstdint>
#include <ostream>
@@ -26,9 +30,8 @@
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/util.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/net/socket.h"
#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -185,10 +188,10 @@ TEST_F(NetUtilTest, TestReverseLookup) {
}
TEST_F(NetUtilTest, TestLsof) {
+ Sockaddr addr = Sockaddr::Wildcard();
Socket s;
- ASSERT_OK(s.Init(0));
+ ASSERT_OK(s.Init(addr.family(), 0));
- Sockaddr addr = Sockaddr::Wildcard(); // wildcard
ASSERT_OK(s.BindAndListen(addr, 1));
ASSERT_OK(s.GetSocketAddress(&addr));
@@ -249,4 +252,29 @@ TEST_F(NetUtilTest, TestSockaddrEquality) {
ASSERT_TRUE(ip_port == copy);
}
+TEST_F(NetUtilTest, TestUnixSockaddr) {
+ Sockaddr addr;
+ ASSERT_OK(addr.ParseUnixDomainPath("/foo/bar"));
+ ASSERT_EQ(addr.family(), AF_UNIX);
+ ASSERT_EQ(addr.UnixDomainPath(), "/foo/bar");
+ ASSERT_EQ(addr.ToString(), "unix:/foo/bar");
+ ASSERT_EQ(Sockaddr::UnixAddressType::kPath, addr.unix_address_type());
+
+ Sockaddr addr2;
+ ASSERT_OK(addr2.ParseUnixDomainPath("@my-abstract"));
+ ASSERT_EQ(addr2.family(), AF_UNIX);
+ ASSERT_EQ(addr2.UnixDomainPath(), "@my-abstract");
+ ASSERT_EQ(addr2.ToString(), "unix:@my-abstract");
+ ASSERT_EQ(Sockaddr::UnixAddressType::kAbstractNamespace,
addr2.unix_address_type());
+
+ ASSERT_TRUE(addr == addr);
+ ASSERT_TRUE(addr2 == addr2);
+ ASSERT_FALSE(addr == addr2);
+ ASSERT_FALSE(addr2 == addr);
+ ASSERT_FALSE(addr == Sockaddr::Wildcard());
+ ASSERT_FALSE(Sockaddr::Wildcard() == addr);
+ ASSERT_FALSE(addr == Sockaddr());
+ ASSERT_FALSE(Sockaddr() == addr);
+}
+
} // namespace kudu
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 6f5325d..f74b324 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -448,7 +448,7 @@ Status GetRandomPort(const string& address, uint16_t* port)
{
Sockaddr sockaddr;
sockaddr.ParseString(address, 0);
Socket listener;
- RETURN_NOT_OK(listener.Init(0));
+ RETURN_NOT_OK(listener.Init(sockaddr.family(), 0));
RETURN_NOT_OK(listener.Bind(sockaddr));
Sockaddr listen_address;
RETURN_NOT_OK(listener.GetSocketAddress(&listen_address));
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index 45957df..b680294 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -20,10 +20,13 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <cerrno>
+#include <cstddef>
#include <cstring>
#include <limits>
+#include <ostream>
#include <string>
#include <glog/logging.h>
@@ -73,13 +76,17 @@ Sockaddr& Sockaddr::operator=(const Sockaddr& other)
noexcept {
return *this;
}
-Sockaddr::Sockaddr(const struct sockaddr_in& addr) {
+Sockaddr::Sockaddr(const struct sockaddr& addr, socklen_t len) {
+ set_length(len);
+ memcpy(&storage_, &addr, len);
+}
+
+Sockaddr::Sockaddr(const struct sockaddr_in& addr) :
+ Sockaddr(reinterpret_cast<const struct sockaddr&>(addr), sizeof(addr)) {
DCHECK_EQ(AF_INET, addr.sin_family);
- set_length(sizeof(addr));
- memcpy(&storage_.in, &addr, sizeof(struct sockaddr_in));
}
-Status Sockaddr::ParseString(const std::string& s, uint16_t default_port) {
+Status Sockaddr::ParseString(const string& s, uint16_t default_port) {
HostPort hp;
RETURN_NOT_OK(hp.ParseString(s, default_port));
@@ -94,6 +101,64 @@ Status Sockaddr::ParseString(const std::string& s, uint16_t
default_port) {
return Status::OK();
}
+Status Sockaddr::ParseUnixDomainPath(const string& s) {
+ constexpr auto kMaxPathSize = SIZEOF_MEMBER(struct sockaddr_un, sun_path);
+ if (s[0] == '@') {
+ // Specify a path in the abstract namespace.
+ if (s.size() > kMaxPathSize) {
+ return Status::InvalidArgument(Substitute(
+ "UNIX domain socket path exceeds maximum length $0", kMaxPathSize));
+ }
+ set_length(offsetof(struct sockaddr_un, sun_path) + s.size());
+ storage_.un.sun_family = AF_UNIX;
+ memcpy(storage_.un.sun_path, s.data(), s.size());
+ storage_.un.sun_path[0] = '\0';
+ } else {
+ // Path names must be null-terminated. The null-terminated length
+ // may not match the length s.size().
+ int c_len = strlen(s.c_str());
+ if (c_len != s.size()) {
+ return Status::InvalidArgument("UNIX domain socket path must not contain
null bytes");
+ }
+ // Per unix(7) the length of the path name, including the terminating null
+ // byte, should not exceed the size of sun_path.
+ if (c_len + 1 > kMaxPathSize) {
+ return Status::InvalidArgument(Substitute(
+ "UNIX domain socket path exceeds maximum length $0", kMaxPathSize));
+ }
+ // unix(7) says the addrlen can be specified as the full length of the
+ // structure.
+ set_length(sizeof(struct sockaddr_un));
+ storage_.un.sun_family = AF_UNIX;
+ memcpy(storage_.un.sun_path, s.c_str(), c_len + 1);
+ }
+ return Status::OK();
+}
+
+string Sockaddr::UnixDomainPath() const {
+ CHECK_EQ(family(), AF_UNIX);
+ switch (unix_address_type()) {
+ case UnixAddressType::kUnnamed:
+ return "<unnamed>";
+ case UnixAddressType::kPath:
+ return string(storage_.un.sun_path);
+ case UnixAddressType::kAbstractNamespace:
+ size_t len = len_ - offsetof(struct sockaddr_un, sun_path) - 1;
+ return "@" + string(storage_.un.sun_path + 1, len);
+ }
+}
+
+Sockaddr::UnixAddressType Sockaddr::unix_address_type() const {
+ CHECK_EQ(family(), AF_UNIX);
+ if (len_ == sizeof(sa_family_t)) {
+ return UnixAddressType::kUnnamed;
+ }
+ if (storage_.un.sun_path[0] == '\0') {
+ return UnixAddressType::kAbstractNamespace;
+ }
+ return UnixAddressType::kPath;
+}
+
Sockaddr& Sockaddr::operator=(const struct sockaddr_in &addr) {
set_length(sizeof(addr));
memcpy(&storage_, &addr, sizeof(addr));
@@ -104,6 +169,7 @@ Sockaddr& Sockaddr::operator=(const struct sockaddr_in
&addr) {
bool Sockaddr::operator==(const Sockaddr& other) const {
return BytewiseCompare(*this, other) == 0;
}
+
int Sockaddr::BytewiseCompare(const Sockaddr& a, const Sockaddr& b) {
Slice a_slice(reinterpret_cast<const uint8_t*>(&a.storage_), a.len_);
Slice b_slice(reinterpret_cast<const uint8_t*>(&b.storage_), b.len_);
@@ -136,8 +202,18 @@ int Sockaddr::port() const {
}
std::string Sockaddr::host() const {
- DCHECK_EQ(family(), AF_INET);
- return HostPort::AddrToString(storage_.in.sin_addr.s_addr);
+ switch (family()) {
+ case AF_INET:
+ return HostPort::AddrToString(storage_.in.sin_addr.s_addr);
+ case AF_UNIX:
+ DCHECK(false) << "unexpected host() call on unix socket";
+ // In case we missed a host() call somewhere in a vlog or error message
not
+ // covered by tests, better to at least return some string here.
+ return "<unix socket>";
+ default:
+ DCHECK(false) << "unexpected host() call on socket with family " <<
family();
+ return "<unknown socket type>";
+ }
}
const struct sockaddr_in& Sockaddr::ipv4_addr() const {
@@ -146,7 +222,14 @@ const struct sockaddr_in& Sockaddr::ipv4_addr() const {
}
std::string Sockaddr::ToString() const {
- return Substitute("$0:$1", host(), port());
+ switch (family()) {
+ case AF_INET:
+ return Substitute("$0:$1", host(), port());
+ case AF_UNIX:
+ return Substitute("unix:$0", UnixDomainPath());
+ default:
+ return "<invalid sockaddr>";
+ }
}
bool Sockaddr::IsWildcard() const {
@@ -155,6 +238,7 @@ bool Sockaddr::IsWildcard() const {
}
bool Sockaddr::IsAnyLocalAddress() const {
+ if (family() == AF_UNIX) return true;
DCHECK_EQ(family(), AF_INET);
return HostPort::IsLoopback(storage_.in.sin_addr.s_addr);
}
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index 99370dc..cd50e50 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -46,14 +46,15 @@ class Sockaddr {
Sockaddr(const Sockaddr& other) noexcept;
// Construct from an IPv4 socket address.
- explicit Sockaddr(const struct sockaddr_in &addr);
+ explicit Sockaddr(const struct sockaddr_in& addr);
+ explicit Sockaddr(const struct sockaddr& addr, socklen_t len);
// Return the IPv4 wildcard address.
static Sockaddr Wildcard();
// Assignment operators.
Sockaddr& operator=(const Sockaddr& other) noexcept;
- Sockaddr& operator=(const struct sockaddr_in &addr);
+ Sockaddr& operator=(const struct sockaddr_in& addr);
// Compare two addresses for equality. To be equal, the addresses must have
the same
// family and have the same bytewise representation. Two uninitialized
addresses
@@ -85,6 +86,13 @@ class Sockaddr {
// Returns a bad Status if the input is malformed.
Status ParseString(const std::string& s, uint16_t default_port);
+ // Parse a UNIX domain path, storing the result in this Sockaddr object.
+ // A leading '@' indicates the address should be in the UNIX domain "abstract
+ // namespace" (see man unix(7)).
+ //
+ // May return InvalidArgument if the path is too long.
+ Status ParseUnixDomainPath(const std::string& s);
+
// Returns the dotted-decimal string '1.2.3.4' of the host component of this
address.
std::string host() const;
@@ -96,30 +104,52 @@ class Sockaddr {
// REQUIRES: is an IPv4 address.
int port() const;
+ // Get the path for this address, assuming it's a UNIX domain socket address.
+ //
+ // REQUIRES: family() is AF_UNIX.
+ std::string UnixDomainPath() const;
+
+ // Return the type of UNIX domain socket address. See the unix(7) manpage
for more
+ // details.
+ //
+ // REQUIRES: family() is AF_UNIX.
+ enum class UnixAddressType {
+ // A path-based socket visible on the filesystem.
+ kPath,
+ // A stream socket that has not been bound to a pathname using bind(2) has
no
+ // name. For example, the address of a peer connected to a server has no
name.
+ kUnnamed,
+ // A socket visible in the abstract namespace.
+ kAbstractNamespace,
+ };
+ UnixAddressType unix_address_type() const;
const struct sockaddr* addr() const {
return reinterpret_cast<const sockaddr*>(&storage_);
}
-
- const struct sockaddr_in& ipv4_addr() const;
-
socklen_t addrlen() const {
DCHECK(is_initialized());
return len_;
}
+ const struct sockaddr_in& ipv4_addr() const;
+
sa_family_t family() const {
DCHECK(is_initialized());
return storage_.generic.ss_family;
}
+ bool is_ip() const {
+ return family() == AF_INET;
+ }
+
// Returns the stringified address in '1.2.3.4:<port>' format.
std::string ToString() const;
// Returns true if the address is 0.0.0.0
bool IsWildcard() const;
- // Returns true if the address is 127.*.*.*
+ // Returns true if the address is 127.*.*.* or a unix socket.
bool IsAnyLocalAddress() const;
// Does reverse DNS lookup of the address and stores it in hostname.
@@ -148,6 +178,7 @@ class Sockaddr {
union {
struct sockaddr_storage generic;
struct sockaddr_in in;
+ struct sockaddr_un un;
} storage_;
};
diff --git a/src/kudu/util/net/socket-test.cc b/src/kudu/util/net/socket-test.cc
index 8ecea4e..df966d5 100644
--- a/src/kudu/util/net/socket-test.cc
+++ b/src/kudu/util/net/socket-test.cc
@@ -17,22 +17,27 @@
#include "kudu/util/net/socket.h"
-#include <thread>
+#include <unistd.h>
+#include <cstddef>
#include <cstdint>
-#include <glog/logging.h>
-#include <gtest/gtest.h>
#include <memory>
-#include <stddef.h>
#include <string>
+#include <thread>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+using std::string;
namespace kudu {
@@ -40,16 +45,36 @@ constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
class SocketTest : public KuduTest {
protected:
- void DoTest(bool accept, const std::string &message) {
+ Socket listener_;
+ Sockaddr listen_addr_;
+
+ void BindAndListen(const string& addr_str) {
+ Sockaddr address;
+ ASSERT_OK(address.ParseString(addr_str, 0));
+ BindAndListen(address);
+ }
+ void BindAndListenUnix(const string& path) {
Sockaddr address;
- address.ParseString("0.0.0.0", 0);
- Socket listener_;
+ ASSERT_OK(address.ParseUnixDomainPath(path));
+ BindAndListen(address);
+ }
- CHECK_OK(listener_.Init(0));
+ void BindAndListen(const Sockaddr& address) {
+ CHECK_OK(listener_.Init(address.family(), 0));
CHECK_OK(listener_.BindAndListen(address, 0));
- Sockaddr listen_address;
- CHECK_OK(listener_.GetSocketAddress(&listen_address));
+ CHECK_OK(listener_.GetSocketAddress(&listen_addr_));
+ }
+
+ Socket ConnectToListeningServer() {
+ Socket client;
+ CHECK_OK(client.Init(listen_addr_.family(), 0));
+ CHECK_OK(client.Connect(listen_addr_));
+ CHECK_OK(client.SetRecvTimeout(MonoDelta::FromMilliseconds(100)));
+ return client;
+ }
+ void DoTestServerDisconnects(bool accept, const std::string &message) {
+ NO_FATALS(BindAndListen("0.0.0.0:0"));
std::thread t([&]{
if (accept) {
Sockaddr new_addr;
@@ -62,11 +87,7 @@ class SocketTest : public KuduTest {
}
});
- Socket client;
- ASSERT_OK(client.Init(0));
- ASSERT_OK(client.Connect(listen_address));
- CHECK_OK(client.SetRecvTimeout(MonoDelta::FromMilliseconds(100)));
-
+ Socket client = ConnectToListeningServer();
int n;
std::unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
Status s = client.Recv(buf.get(), kEchoChunkSize, &n);
@@ -77,13 +98,69 @@ class SocketTest : public KuduTest {
t.join();
}
+
+ void DoUnixSocketTest(const string& path) {
+ const string kData = "hello world over a socket";
+
+ NO_FATALS(BindAndListenUnix(path));
+ std::thread t(
+ [&]{
+ Sockaddr new_addr;
+ Socket sock;
+ CHECK_OK(listener_.Accept(&sock, &new_addr, 0));
+
+ // Test GetPeerAddress from server side.
+ Sockaddr peer_addr;
+ CHECK_OK(sock.GetPeerAddress(&peer_addr));
+ CHECK_EQ("unix:<unnamed>", peer_addr.ToString());
+
+ size_t n_written;
+ CHECK_OK(sock.BlockingWrite(
+ reinterpret_cast<const uint8_t*>(kData.data()), kData.size(),
&n_written,
+ MonoTime::Now() + MonoDelta::FromSeconds(10)));
+ CHECK_OK(sock.Close());
+ });
+
+ Socket client = ConnectToListeningServer();
+
+ // Test GetPeerAddress from client side.
+ Sockaddr peer_addr;
+ ASSERT_OK(client.GetPeerAddress(&peer_addr));
+ EXPECT_EQ("unix:" + path, peer_addr.ToString());
+
+ size_t n;
+ char buf[kData.size()];
+ ASSERT_OK(client.BlockingRecv(reinterpret_cast<uint8_t*>(buf),
kData.size(), &n,
+ MonoTime::Now() +
MonoDelta::FromSeconds(5)));
+ t.join();
+ ASSERT_OK(client.Close());
+
+ ASSERT_EQ(n, kData.size());
+ ASSERT_EQ(string(buf, n), kData);
+ }
};
TEST_F(SocketTest, TestRecvReset) {
- DoTest(false, "recv error from 127.0.0.1:[0-9]+: Resource temporarily
unavailable");
+ DoTestServerDisconnects(false, "recv error from 127.0.0.1:[0-9]+: "
+ "Resource temporarily unavailable");
}
TEST_F(SocketTest, TestRecvEOF) {
- DoTest(true, "recv got EOF from 127.0.0.1:[0-9]+");
+ DoTestServerDisconnects(true, "recv got EOF from 127.0.0.1:[0-9]+");
+}
+
+TEST_F(SocketTest, TestUnixSocketAbstractNamespace) {
+ DoUnixSocketTest(strings::Substitute("@kudu-test-pid-$0", getpid()));
}
+TEST_F(SocketTest, TestUnixSocketFilesystemPath) {
+ // Use a path in /tmp/ instead of the normal GetTestPath approach because
+ // unix domain socket paths are limited in length. The test directory
+ // may be too long.
+ string path = strings::Substitute("/tmp/kudu-test-pid-$0", getpid());
+ SCOPED_CLEANUP({
+ unlink(path.c_str());
+ });
+ DoUnixSocketTest(path);
+}
+
} // namespace kudu
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index b0ce530..26a8ccc 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -73,6 +73,10 @@ Socket::Socket(int fd)
: fd_(fd) {
}
+Socket::Socket(Socket&& other) noexcept
+ : fd_(other.Release()) {
+}
+
void Socket::Reset(int fd) {
ignore_result(Close());
fd_ = fd;
@@ -130,9 +134,9 @@ bool Socket::IsTemporarySocketError(int err) {
#if defined(__linux__)
-Status Socket::Init(int flags) {
+Status Socket::Init(int family, int flags) {
int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
- Reset(::socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
+ Reset(::socket(family, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
if (fd_ < 0) {
int err = errno;
return Status::NetworkError("error opening socket", ErrnoToString(err),
err);
@@ -143,8 +147,8 @@ Status Socket::Init(int flags) {
#else
-Status Socket::Init(int flags) {
- Reset(::socket(AF_INET, SOCK_STREAM, 0));
+Status Socket::Init(int family, int flags) {
+ Reset(::socket(family, SOCK_STREAM, 0));
if (fd_ < 0) {
int err = errno;
return Status::NetworkError("error opening socket", ErrnoToString(err),
err);
@@ -272,26 +276,26 @@ Status Socket::Listen(int listen_queue_size) {
}
Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
- struct sockaddr_in sin;
- socklen_t len = sizeof(sin);
+ struct sockaddr_storage ss;
+ socklen_t len = sizeof(ss);
DCHECK_GE(fd_, 0);
- if (::getsockname(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) ==
-1) {
+ if (::getsockname(fd_, reinterpret_cast<struct sockaddr*>(&ss), &len) == -1)
{
int err = errno;
return Status::NetworkError("getsockname error", ErrnoToString(err), err);
}
- *cur_addr = sin;
+ *cur_addr = Sockaddr(reinterpret_cast<struct sockaddr&>(ss), len);
return Status::OK();
}
Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
- struct sockaddr_in sin;
- socklen_t len = sizeof(sin);
+ struct sockaddr_storage addr;
+ socklen_t len = sizeof(addr);
DCHECK_GE(fd_, 0);
- if (::getpeername(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) ==
-1) {
+ if (::getpeername(fd_, reinterpret_cast<struct sockaddr*>(&addr), &len) ==
-1) {
int err = errno;
return Status::NetworkError("getpeername error", ErrnoToString(err), err);
}
- *cur_addr = sin;
+ *cur_addr = Sockaddr(reinterpret_cast<const sockaddr&>(addr), len);
return Status::OK();
}
@@ -318,7 +322,8 @@ Status Socket::Bind(const Sockaddr& bind_addr) {
bind_addr.ToString(), ErrnoToString(err)),
Slice(), err);
- if (s.IsNetworkError() && s.posix_code() == EADDRINUSE && bind_addr.port()
!= 0) {
+ if (s.IsNetworkError() && bind_addr.is_ip() &&
+ s.posix_code() == EADDRINUSE && bind_addr.port() != 0) {
TryRunLsof(bind_addr);
}
return s;
@@ -329,7 +334,7 @@ Status Socket::Bind(const Sockaddr& bind_addr) {
Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
TRACE_EVENT0("net", "Socket::Accept");
- struct sockaddr_in addr;
+ struct sockaddr_storage addr;
socklen_t olen = sizeof(addr);
DCHECK_GE(fd_, 0);
#if defined(__linux__)
@@ -358,7 +363,7 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote,
int flags) {
RETURN_NOT_OK(new_conn->SetCloseOnExec());
#endif // defined(__linux__)
- *remote = addr;
+ *remote = Sockaddr(reinterpret_cast<const sockaddr&>(addr), olen);
TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
"remote", remote->ToString());
return Status::OK();
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 0f561a7..830a389 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -40,6 +40,7 @@ class Socket {
// Start managing a socket.
explicit Socket(int fd);
+ Socket(Socket&& other) noexcept;
// Close the socket. Errors will be ignored.
virtual ~Socket();
@@ -64,7 +65,8 @@ class Socket {
// the socket.
static bool IsTemporarySocketError(int err);
- Status Init(int flags); // See FLAG_NONBLOCKING
+ // Init the socket for use with the given family (eg AF_INET or AF_UNIX).
+ Status Init(int family, int flags); // See FLAG_NONBLOCKING
// Set or clear TCP_NODELAY
Status SetNoDelay(bool enabled);