This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 41ebabf2eb618b33fd30ad1821ccbda9d6390010 Author: Andrew Wong <[email protected]> AuthorDate: Sat Aug 28 13:55:47 2021 -0700 [rpc] KUDU-75: refresh DNS entries if proxies hit a network error This patch aims to tackle the following issues that revolve around changes in addresses at runtime. - KUDU-1885: master long-lived tserver proxies need to be re-resolved in case nodes are assigned different addresses; today we just retry at the same location forever. - KUDU-1620: tablet consensus long-lived proxies need to be re-resolved on failure. - C++ clients' usages of RemoteTabletServer also have long-lived proxies and are likely to run into similar problems if tservers are restarted and assigned new physical addresses. It addresses this by plumbing a DnsResolver into the rpc::Proxy class, and chaining the asynchronous callback to an asynchronous refresh of the address with the newly introduced refreshing capabilities of the DnsResolver. The new style of proxy isn't currently used, but a test is added exercising the new functionality. Change-Id: I777d169bd3a461294e5721f05071b726ced70f7e Reviewed-on: http://gerrit.cloudera.org:8080/17839 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/rpc/CMakeLists.txt | 1 + src/kudu/rpc/connection_id.cc | 4 +- src/kudu/rpc/connection_id.h | 4 + src/kudu/rpc/mt-rpc-test.cc | 4 +- src/kudu/rpc/protoc-gen-krpc.cc | 19 +++- src/kudu/rpc/proxy-test.cc | 221 ++++++++++++++++++++++++++++++++++++++++ src/kudu/rpc/proxy.cc | 156 ++++++++++++++++++++++++++-- src/kudu/rpc/proxy.h | 67 +++++++++++- src/kudu/rpc/rpc-test-base.h | 26 ++--- src/kudu/rpc/rpc-test.cc | 82 +++++++-------- src/kudu/rpc/rpc_stub-test.cc | 2 +- src/kudu/util/net/sockaddr.cc | 3 + src/kudu/util/net/sockaddr.h | 2 +- 13 files changed, 517 insertions(+), 74 deletions(-) diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt index 273842d..c8d831e 100644 --- a/src/kudu/rpc/CMakeLists.txt +++ b/src/kudu/rpc/CMakeLists.txt @@ -129,6 +129,7 @@ ADD_KUDU_TEST(exactly_once_rpc-test PROCESSORS 10) ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true) ADD_KUDU_TEST(negotiation-test) ADD_KUDU_TEST(periodic-test) +ADD_KUDU_TEST(proxy-test) ADD_KUDU_TEST(reactor-test) ADD_KUDU_TEST(request_tracker-test) ADD_KUDU_TEST(rpc-bench RUN_SERIAL true) diff --git a/src/kudu/rpc/connection_id.cc b/src/kudu/rpc/connection_id.cc index 9728f01..8a14d83 100644 --- a/src/kudu/rpc/connection_id.cc +++ b/src/kudu/rpc/connection_id.cc @@ -20,7 +20,7 @@ #include <cstddef> #include <utility> -#include <boost/functional/hash/hash.hpp> +#include <boost/container_hash/extensions.hpp> #include <glog/logging.h> #include "kudu/gutil/strings/substitute.h" @@ -52,7 +52,7 @@ void ConnectionId::set_network_plane(string network_plane) { string ConnectionId::ToString() const { string remote; - if (remote_.is_ip() && hostname_ != remote_.host()) { + if (remote_.is_initialized() && remote_.is_ip() && hostname_ != remote_.host()) { remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_); } else { remote = remote_.ToString(); diff --git a/src/kudu/rpc/connection_id.h b/src/kudu/rpc/connection_id.h index 6ec98f7..0aed953 100644 --- a/src/kudu/rpc/connection_id.h +++ b/src/kudu/rpc/connection_id.h @@ -45,6 +45,10 @@ class ConnectionId { const std::string& hostname() const { return hostname_; } + void set_remote(const Sockaddr& remote) { + remote_ = remote; + } + // The credentials of the user associated with this connection, if any. void set_user_credentials(UserCredentials user_credentials); diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc index 09bda4b..3496e26 100644 --- a/src/kudu/rpc/mt-rpc-test.cc +++ b/src/kudu/rpc/mt-rpc-test.cc @@ -71,7 +71,7 @@ class MultiThreadedRpcTest : public RpcTestBase { CHECK_OK(CreateMessenger("ClientSC", &client_messenger)); Proxy p(client_messenger, server_addr, server_addr.host(), GenericCalculatorService::static_service_name()); - *result = DoTestSyncCall(p, method_name); + *result = DoTestSyncCall(&p, method_name); latch->CountDown(); } @@ -93,7 +93,7 @@ class MultiThreadedRpcTest : public RpcTestBase { int i = 0; while (true) { i++; - Status s = DoTestSyncCall(p, method_name); + Status s = DoTestSyncCall(&p, method_name); if (!s.ok()) { // Return on first failure. LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: " diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc index 2226160..e8d8e55 100644 --- a/src/kudu/rpc/protoc-gen-krpc.cc +++ b/src/kudu/rpc/protoc-gen-krpc.cc @@ -575,6 +575,10 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { " std::shared_ptr<::kudu::rpc::Messenger> messenger,\n" " const ::kudu::Sockaddr& sockaddr,\n" " std::string hostname);\n" + " $service_name$Proxy(\n" + " std::shared_ptr<::kudu::rpc::Messenger> messenger,\n" + " const ::kudu::HostPort& hp,\n" + " DnsResolver* dns_resolver);\n" " ~$service_name$Proxy();\n"); for (int method_idx = 0; method_idx < service->method_count(); @@ -639,6 +643,15 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { " std::move(hostname),\n" " \"$full_service_name$\") {\n" "}\n" + "$service_name$Proxy::$service_name$Proxy(\n" + " std::shared_ptr<::kudu::rpc::Messenger> messenger,\n" + " const ::kudu::HostPort& hp,\n" + " DnsResolver* dns_resolver)\n" + " : Proxy(std::move(messenger),\n" + " hp,\n" + " dns_resolver,\n" + " \"$full_service_name$\") {\n" + "}\n" "\n" "$service_name$Proxy::~$service_name$Proxy() {\n" "}\n"); @@ -652,7 +665,8 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { " const $request$& req,\n" " $response$* resp,\n" " ::kudu::rpc::RpcController* controller) {\n" - " return SyncRequest(\"$rpc_name$\", req, resp, controller);\n" + " static const std::string kRpcName = \"$rpc_name$\";\n" + " return SyncRequest(kRpcName, req, resp, controller);\n" "}\n" "\n" "void $service_name$Proxy::$rpc_name$Async(\n" @@ -660,7 +674,8 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { " $response$* resp,\n" " ::kudu::rpc::RpcController* controller,\n" " const ::kudu::rpc::ResponseCallback& callback) {\n" - " AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n" + " static const std::string kRpcName = \"$rpc_name$\";\n" + " AsyncRequest(kRpcName, req, resp, controller, callback);\n" "}\n"); subs->Pop(); // method } diff --git a/src/kudu/rpc/proxy-test.cc b/src/kudu/rpc/proxy-test.cc new file mode 100644 index 0000000..34b626c --- /dev/null +++ b/src/kudu/rpc/proxy-test.cc @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/proxy.h" + +#include <cstdint> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/rpc-test-base.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rtest.pb.h" +#include "kudu/rpc/service_pool.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/dns_resolver.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +DECLARE_string(dns_addr_resolution_override); + +using std::shared_ptr; +using std::string; +using std::thread; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +class Messenger; + +namespace { + +constexpr uint16_t kPort = 1111; +constexpr const char* kFakeHost = "fakehost"; +const HostPort kFakeHostPort(kFakeHost, kPort); + +Status SendRequest(Proxy* p) { + SleepRequestPB req; + req.set_sleep_micros(100 * 1000); // 100ms + SleepResponsePB resp; + RpcController controller; + controller.set_timeout(MonoDelta::FromMilliseconds(10000)); + return p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &controller); +} + +} // anonymous namespace + +class RpcProxyTest : public RpcTestBase { +}; + +// Test that proxies initialized with a DnsResolver return errors when +// receiving a non-transient error. +TEST_F(RpcProxyTest, TestProxyReturnsOnNonTransientError) { + SKIP_IF_SLOW_NOT_ALLOWED(); // This test waits for a timeout. + + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("client_messenger", &client_messenger)); + DnsResolver dns_resolver(1, 1024 * 1024); + Proxy p(client_messenger, kFakeHostPort, &dns_resolver, + CalculatorService::static_service_name()); + p.Init(); + Status s = SendRequest(&p); + ASSERT_TRUE(s.IsNetworkError()) << s.ToString(); + + // If we do resolve to an address that turns out to be bogus, we should + // time out when negotiating. + FLAGS_dns_addr_resolution_override = Substitute("$0=1.1.1.1", kFakeHost); + s = SendRequest(&p); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); +} + +// Test that ensures a proxy initialized with an address will use that address. +TEST_F(RpcProxyTest, TestProxyUsesInitialAddr) { + string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode); + Sockaddr server_addr; + ASSERT_OK(server_addr.ParseString(ip1, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr)); + + // Despite our proxy being configured with a fake host, our request should + // still go through since we call Init() with a valid address. + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("client_messenger", &client_messenger)); + DnsResolver dns_resolver(1, 1024 * 1024); + Proxy p(client_messenger, kFakeHostPort, &dns_resolver, + CalculatorService::static_service_name()); + p.Init(server_addr); + ASSERT_OK(SendRequest(&p)); + + server_messenger_.reset(); + service_pool_.reset(); + + // With our server down, the request should fail. + Status s = SendRequest(&p); + ASSERT_TRUE(s.IsNetworkError()) << s.ToString(); + + // Once we bring up a new server and allow our proxy to resolve it, the + // request should succeed. + string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode); + Sockaddr second_addr; + ASSERT_OK(second_addr.ParseString(ip2, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr)); + FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString()); + ASSERT_OK(SendRequest(&p)); +} + +TEST_F(RpcProxyTest, TestNonResolvingProxyIgnoresInit) { + string ip = GetBindIpForDaemon(/*index*/1, kDefaultBindMode); + Sockaddr server_addr; + ASSERT_OK(server_addr.ParseString(ip, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr)); + + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("client_messenger", &client_messenger)); + DnsResolver dns_resolver(1, 1024 * 1024); + HostPort hp(ip, kPort); + Proxy p(client_messenger, hp, &dns_resolver, CalculatorService::static_service_name()); + + // Call Init() with a fake address. Because this proxy isn't configured for + // address re-resolution, the new address is ignored. + Sockaddr fake_addr; + ASSERT_OK(fake_addr.ParseString("1.1.1.1", kPort)); + p.Init(fake_addr); + + // We should thus have no trouble sending a request. + ASSERT_OK(SendRequest(&p)); +} + +// Start a proxy with a DNS resolver that maps a hostname to the address bound +// by the server. Then restart the server but bind to a different address, and +// update the DNS resolver to map the same hostname to the different address. +// The proxy should eventually be usable. +TEST_F(RpcProxyTest, TestProxyReresolvesAddress) { + string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode); + Sockaddr server_addr; + ASSERT_OK(server_addr.ParseString(ip1, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr)); + FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString()); + + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("client_messenger", &client_messenger)); + DnsResolver dns_resolver(1, 1024 * 1024); + Proxy p(client_messenger, kFakeHostPort, &dns_resolver, + CalculatorService::static_service_name()); + p.Init(); + ASSERT_OK(SendRequest(&p)); + + string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode); + Sockaddr second_addr; + ASSERT_OK(second_addr.ParseString(ip2, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr)); + FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString()); + ASSERT_OK(SendRequest(&p)); +} + +TEST_F(RpcProxyTest, TestProxyReresolvesAddressFromThreads) { + constexpr const int kNumThreads = 4; + + string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode); + Sockaddr server_addr; + ASSERT_OK(server_addr.ParseString(ip1, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr)); + FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString()); + + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("client_messenger", &client_messenger)); + DnsResolver dns_resolver(1, 1024 * 1024); + Proxy p(client_messenger, kFakeHostPort, &dns_resolver, + CalculatorService::static_service_name()); + p.Init(); + ASSERT_OK(SendRequest(&p)); + + string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode); + Sockaddr second_addr; + ASSERT_OK(second_addr.ParseString(ip2, kPort)); + ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr)); + FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString()); + + vector<Status> errors(kNumThreads); + vector<thread> threads; + threads.reserve(kNumThreads); + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&, i] { + errors[i] = SendRequest(&p); + }); + } + for (auto& t : threads) { + t.join(); + } + for (const auto& e : errors) { + EXPECT_OK(e); + } +} + +} // namespace rpc +} // namespace kudu diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc index 8bd45fb..8d810b8 100644 --- a/src/kudu/rpc/proxy.cc +++ b/src/kudu/rpc/proxy.cc @@ -21,9 +21,11 @@ #include <iostream> #include <memory> #include <utility> +#include <vector> #include <glog/logging.h> +#include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/messenger.h" #include "kudu/rpc/outbound_call.h" @@ -31,6 +33,8 @@ #include "kudu/rpc/response_callback.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/user_credentials.h" +#include "kudu/util/net/dns_resolver.h" +#include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/notification.h" #include "kudu/util/status.h" @@ -39,6 +43,9 @@ using google::protobuf::Message; using std::string; using std::shared_ptr; +using std::unique_ptr; +using std::vector; +using strings::Substitute; namespace kudu { namespace rpc { @@ -48,6 +55,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger, string hostname, string service_name) : service_name_(std::move(service_name)), + dns_resolver_(nullptr), messenger_(std::move(messenger)), is_started_(false) { CHECK(messenger_ != nullptr); @@ -67,19 +75,72 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger, conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds)); } +Proxy::Proxy(std::shared_ptr<Messenger> messenger, + HostPort hp, + DnsResolver* dns_resolver, + string service_name) + : service_name_(std::move(service_name)), + hp_(std::move(hp)), + dns_resolver_(dns_resolver), + messenger_(std::move(messenger)), + is_started_(false) { + CHECK(messenger_ != nullptr); + DCHECK(!service_name_.empty()) << "Proxy service name must not be blank"; + DCHECK(hp_.Initialized()); +} + +Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const { + DCHECK(!addrs->empty()); + if (PREDICT_FALSE(addrs->size() > 1)) { + LOG(WARNING) << Substitute( + "$0 proxy host/port $1 resolves to $2 different addresses. Using $3", + service_name_, hp_.ToString(), addrs->size(), (*addrs)[0].ToString()); + } + return &(*addrs)[0]; +} + +void Proxy::Init(Sockaddr addr) { + if (!dns_resolver_) { + return; + } + // By default, we set the real user to the currently logged-in user. + // Effective user and password remain blank. + string real_user; + Status s = GetLoggedInUser(&real_user); + if (!s.ok()) { + LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: " + << s.ToString() << " before connecting to host/port: " << hp_.ToString(); + } + vector<Sockaddr> addrs; + if (!addr.is_initialized()) { + s = dns_resolver_->ResolveAddresses(hp_, &addrs); + if (PREDICT_TRUE(s.ok() && !addrs.empty())) { + addr = *GetSingleSockaddr(&addrs); + DCHECK(addr.is_initialized()); + addr.set_port(hp_.port()); + // NOTE: it's ok to proceed on failure -- the address will remain + // uninitialized and be re-resolved when sending the next request. + } + } + + UserCredentials creds; + creds.set_real_user(std::move(real_user)); + conn_id_ = ConnectionId(addr, hp_.host(), std::move(creds)); +} + Proxy::~Proxy() { } -void Proxy::AsyncRequest(const string& method, - const google::protobuf::Message& req, - google::protobuf::Message* response, - RpcController* controller, - const ResponseCallback& callback) const { - CHECK(!controller->call_) << "Controller should be reset"; - base::subtle::NoBarrier_Store(&is_started_, true); +void Proxy::EnqueueRequest(const string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback) const { + ConnectionId connection = conn_id(); + DCHECK(connection.remote().is_initialized()); RemoteMethod remote_method(service_name_, method); controller->call_.reset( - new OutboundCall(conn_id_, remote_method, response, controller, callback)); + new OutboundCall(connection, remote_method, response, controller, callback)); controller->SetRequestParam(req); controller->SetMessenger(messenger_.get()); @@ -88,11 +149,86 @@ void Proxy::AsyncRequest(const string& method, messenger_->QueueOutboundCall(controller->call_); } +void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback) { + DCHECK(!controller->call_); + vector<Sockaddr>* addrs = new vector<Sockaddr>(); + DCHECK_NOTNULL(dns_resolver_)->RefreshAddressesAsync(hp_, addrs, + [this, &req, &method, callback, response, controller, addrs] (const Status& s) { + unique_ptr<vector<Sockaddr>> unique_addrs(addrs); + // If we fail to resolve the address, treat the call as failed. + if (!s.ok() || addrs->empty()) { + DCHECK(!controller->call_); + // NOTE: we need to keep a reference here because the callback may end up + // destructing the controller and the outbound call, _while_ the callback + // is running from within the call! + auto shared_call = std::make_shared<OutboundCall>( + conn_id(), RemoteMethod{service_name_, method}, response, controller, callback); + controller->call_ = shared_call; + controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh physical address")); + return; + } + auto* addr = GetSingleSockaddr(addrs); + DCHECK(addr->is_initialized()); + addr->set_port(hp_.port()); + { + std::lock_guard<simple_spinlock> l(lock_); + conn_id_.set_remote(*addr); + } + EnqueueRequest(method, req, response, controller, callback); + }); +} + +void Proxy::AsyncRequest(const string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback) { + CHECK(!controller->call_) << "Controller should be reset"; + base::subtle::NoBarrier_Store(&is_started_, true); + if (!dns_resolver_) { + EnqueueRequest(method, req, response, controller, callback); + return; + } + + // If we haven't successfully initialized the remote, e.g. because the DNS + // lookup failed, refresh the DNS entry and enqueue the request. + bool remote_initialized; + { + std::lock_guard<simple_spinlock> l(lock_); + remote_initialized = conn_id_.remote().is_initialized(); + } + if (!remote_initialized) { + RefreshDnsAndEnqueueRequest(method, req, response, controller, callback); + return; + } + + // Otherwise, just enqueue the request, but retry if there's a network error, + // since it's possible the physical address of the host was changed. We only + // retry once more before calling the callback. + auto refresh_dns_and_cb = [this, &req, &method, + callback, response, controller] () { + // TODO(awong): we should be more specific here -- consider having the RPC + // layer set a flag in the controller that warrants a retry. + if (PREDICT_FALSE(!controller->status().ok())) { + controller->Reset(); + RefreshDnsAndEnqueueRequest(method, req, response, controller, callback); + return; + } + // For any other status, OK or otherwise, just run the callback. + callback(); + }; + EnqueueRequest(method, req, response, controller, refresh_dns_and_cb); +} + Status Proxy::SyncRequest(const string& method, const google::protobuf::Message& req, google::protobuf::Message* resp, - RpcController* controller) const { + RpcController* controller) { Notification note; AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller, [¬e]() { note.Notify(); }); @@ -113,7 +249,7 @@ void Proxy::set_network_plane(string network_plane) { } std::string Proxy::ToString() const { - return strings::Substitute("$0@$1", service_name_, conn_id_.ToString()); + return Substitute("$0@$1", service_name_, conn_id_.ToString()); } } // namespace rpc diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h index ccf5f18..b0bc512 100644 --- a/src/kudu/rpc/proxy.h +++ b/src/kudu/rpc/proxy.h @@ -17,12 +17,16 @@ #pragma once #include <memory> +#include <mutex> #include <string> +#include <vector> #include "kudu/gutil/atomicops.h" #include "kudu/gutil/macros.h" #include "kudu/rpc/connection_id.h" #include "kudu/rpc/response_callback.h" +#include "kudu/util/locks.h" +#include "kudu/util/net/net_util.h" #include "kudu/util/status.h" namespace google { @@ -33,6 +37,7 @@ class Message; namespace kudu { +class DnsResolver; class Sockaddr; namespace rpc { @@ -65,8 +70,27 @@ class Proxy { std::string hostname, std::string service_name); + // TODO(awong): consider a separate auto-resolving proxy class? + Proxy(std::shared_ptr<Messenger> messenger, + HostPort hp, + DnsResolver* dns_resolver, + std::string service_name); + ~Proxy(); + // If the proxy is configured for address re-resolution (by supplying a + // DnsResolver and HostPort in the constructor), performs an initial + // resolution of the address using the HostPort. If 'addr' is supplied, it is + // used instead of performing resolution (this is useful to initialize + // several proxies with a single external DNS resolution). + // + // Otherwise, this is a no-op. + // + // NOTE: it is always OK to skip calling this method -- if this proxy is + // configured for address re-resolution and this is skipped, the resolution + // will happen upon sending the first request. + void Init(Sockaddr addr = {}); + // Call a remote method asynchronously. // // Typically, users will not call this directly, but rather through @@ -97,14 +121,14 @@ class Proxy { const google::protobuf::Message& req, google::protobuf::Message* resp, RpcController* controller, - const ResponseCallback& callback) const; + const ResponseCallback& callback); // The same as AsyncRequest(), except that the call blocks until the call // finishes. If the call fails, returns a non-OK result. Status SyncRequest(const std::string& method, const google::protobuf::Message& req, google::protobuf::Message* resp, - RpcController* controller) const; + RpcController* controller); // Set the user credentials which should be used to log in. void set_user_credentials(UserCredentials user_credentials); @@ -121,9 +145,48 @@ class Proxy { std::string ToString() const; private: + // Asynchronously refreshes the DNS, enqueueing the given request upon + // success, or failing the call and calling the callback upon failure. + void RefreshDnsAndEnqueueRequest(const std::string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback); + + // Queues the given request as an outbound call using the given messenger, + // controller, and response. + void EnqueueRequest(const std::string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback) const; + + // Returns a single Sockaddr from the 'addrs', logging a warning if there is + // more than one to choose from. + Sockaddr* GetSingleSockaddr(std::vector<Sockaddr>* addrs) const; + + ConnectionId conn_id() const { + std::lock_guard<simple_spinlock> l(lock_); + return conn_id_; + } + const std::string service_name_; + HostPort hp_; + DnsResolver* dns_resolver_; std::shared_ptr<Messenger> messenger_; + + // TODO(awong): consider implementing some lock-free list of ConnectionIds + // instead of taking a lock every time we want to get the "current" + // ConnectionId. + // + // Connection ID used by this proxy. Once the proxy has started sending + // requests, the connection ID may be updated in response to calls (e.g. if + // we re-resolved the physical address in response to an invalid DNS entry). + // As such, 'conn_id_' is protected by 'lock_', and should be copied and + // passed around, rather than used directly + mutable simple_spinlock lock_; ConnectionId conn_id_; + mutable Atomic32 is_started_; DISALLOW_COPY_AND_ASSIGN(Proxy); diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index 4a87b0f..602fa6e 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -467,8 +467,8 @@ class RpcTestBase : public KuduTest { return bld.Build(messenger); } - Status DoTestSyncCall(const Proxy &p, const char *method, - CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) { + static Status DoTestSyncCall(Proxy* p, const char *method, + CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) { AddRequestPB req; req.set_x(rand()); req.set_y(rand()); @@ -476,13 +476,13 @@ class RpcTestBase : public KuduTest { RpcController controller; controller.set_timeout(MonoDelta::FromMilliseconds(10000)); controller.set_credentials_policy(policy); - RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller)); + RETURN_NOT_OK(p->SyncRequest(method, req, &resp, &controller)); CHECK_EQ(req.x() + req.y(), resp.result()); return Status::OK(); } - void DoTestSidecar(const Proxy &p, int size1, int size2) { +static void DoTestSidecar(Proxy* p, int size1, int size2) { const uint32_t kSeed = 12345; SendTwoStringsRequestPB req; @@ -493,8 +493,8 @@ class RpcTestBase : public KuduTest { SendTwoStringsResponsePB resp; RpcController controller; controller.set_timeout(MonoDelta::FromMilliseconds(10000)); - CHECK_OK(p.SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName, - req, &resp, &controller)); + CHECK_OK(p->SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName, + req, &resp, &controller)); Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1); Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2); @@ -510,11 +510,11 @@ class RpcTestBase : public KuduTest { CHECK_EQ(Slice(expected), second); } - static Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) { + static Status DoTestOutgoingSidecar(Proxy* p, int size1, int size2) { return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), std::string(size2, 'b')}); } - static Status DoTestOutgoingSidecar(const Proxy& p, const std::vector<std::string>& strings) { + static Status DoTestOutgoingSidecar(Proxy* p, const std::vector<std::string>& strings) { PushStringsRequestPB request; RpcController controller; @@ -525,8 +525,8 @@ class RpcTestBase : public KuduTest { } PushStringsResponsePB resp; - KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushStringsMethodName, - request, &resp, &controller)); + KUDU_RETURN_NOT_OK(p->SyncRequest(GenericCalculatorService::kPushStringsMethodName, + request, &resp, &controller)); for (int i = 0; i < strings.size(); i++) { CHECK_EQ(strings[i].size(), resp.sizes(i)); CHECK_EQ(crc::Crc32c(strings[i].data(), strings[i].size()), @@ -536,11 +536,11 @@ class RpcTestBase : public KuduTest { return Status::OK(); } - void DoTestOutgoingSidecarExpectOK(const Proxy &p, int size1, int size2) { + static void DoTestOutgoingSidecarExpectOK(Proxy* p, int size1, int size2) { CHECK_OK(DoTestOutgoingSidecar(p, size1, size2)); } - static void DoTestExpectTimeout(const Proxy& p, + static void DoTestExpectTimeout(Proxy* p, const MonoDelta& timeout, bool will_be_cancelled = false, bool* is_negotiaton_error = nullptr) { @@ -554,7 +554,7 @@ class RpcTestBase : public KuduTest { c.set_timeout(timeout); Stopwatch sw; sw.start(); - Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c); + Status s = p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c); sw.stop(); ASSERT_FALSE(s.ok()); if (is_negotiaton_error != nullptr) { diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index a3eaa04..7a3c657 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -207,7 +207,7 @@ TEST_P(TestRpc, TestNegotiationDeadlock) { Proxy p(messenger, server_addr, kRemoteHostName, GenericCalculatorService::static_service_name()); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } // Test making successful RPC calls. @@ -227,7 +227,7 @@ TEST_P(TestRpc, TestCall) { expected_remote_str(server_addr))); for (int i = 0; i < 10; i++) { - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } } @@ -259,7 +259,7 @@ TEST_P(TestRpc, TestCallWithChainCertAndChainCA) { "{remote=$0, user_credentials=", expected_remote_str(server_addr))); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } // Test for KUDU-2041. @@ -290,7 +290,7 @@ TEST_P(TestRpc, TestCallWithChainCertAndRootCA) { "{remote=$0, user_credentials=", expected_remote_str(server_addr))); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } // Test making successful RPC calls while using a TLS certificate with a password protected @@ -326,7 +326,7 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) { "{remote=$0, user_credentials=", expected_remote_str(server_addr))); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } // Test that using a TLS certificate with a password protected private key and providing @@ -368,7 +368,7 @@ TEST_P(TestRpc, TestCallToBadServer) { // Loop a few calls to make sure that we properly set up and tear down // the connections. for (int i = 0; i < 5; i++) { - Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName); + Status s = DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName); LOG(INFO) << "Status: " << s.ToString(); ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString(); } @@ -388,7 +388,7 @@ TEST_P(TestRpc, TestInvalidMethodCall) { GenericCalculatorService::static_service_name()); // Call the method which fails. - Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist"); + Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist"); ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "bad method"); } @@ -406,7 +406,7 @@ TEST_P(TestRpc, TestWrongService) { Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName"); // Call the method which fails. - Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist"); + Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist"); ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "Service unavailable: service WrongServiceName " @@ -415,7 +415,7 @@ TEST_P(TestRpc, TestWrongService) { // If the server has been marked as having registered all services, we should // expect a "not found" error instead. server_messenger_->SetServicesRegistered(); - s = DoTestSyncCall(p, "ThisMethodDoesNotExist"); + s = DoTestSyncCall(&p, "ThisMethodDoesNotExist"); ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "Not found: service WrongServiceName " @@ -449,7 +449,7 @@ TEST_P(TestRpc, TestHighFDs) { ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl())); Proxy p(client_messenger, server_addr, kRemoteHostName, GenericCalculatorService::static_service_name()); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); } // Test that connections are kept alive between calls. @@ -470,7 +470,7 @@ TEST_P(TestRpc, TestConnectionKeepalive) { Proxy p(client_messenger, server_addr, kRemoteHostName, GenericCalculatorService::static_service_name()); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); SleepFor(MonoDelta::FromMilliseconds(5)); @@ -514,7 +514,7 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) { Proxy p(client_messenger, server_addr, kRemoteHostName, GenericCalculatorService::static_service_name()); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); ReactorMetrics metrics; ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); @@ -651,7 +651,7 @@ TEST_P(TestRpc, TestReopenOutboundConnections) { // Run several iterations, just in case. for (int i = 0; i < 32; ++i) { - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(i + 1, metrics.total_server_connections_); @@ -692,7 +692,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) { ASSERT_EQ(0, metrics.total_server_connections_); // Make an RPC call with ANY_CREDENTIALS policy. - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(1, metrics.total_server_connections_); @@ -708,7 +708,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) { // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open connection // with ANY_CREDENTIALS policy should be closed and a new one established // with PRIMARY_CREDENTIALS policy. - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName, + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName, CredentialsPolicy::PRIMARY_CREDENTIALS)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); @@ -723,7 +723,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) { // connection with PRIMARY_CREDENTIALS policy should be re-used because // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which // the currently open connection has been established with. - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(2, metrics.total_server_connections_); @@ -769,7 +769,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) { ASSERT_EQ(0, metrics.num_client_connections_); // Make an RPC call with the default network plane. - ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(1, metrics.total_server_connections_); @@ -780,7 +780,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) { ASSERT_EQ(1, metrics.num_client_connections_); // Make an RPC call with the non-default network plane. - ASSERT_OK(DoTestSyncCall(p2, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p2, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(2, metrics.total_server_connections_); @@ -792,7 +792,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) { // Make an RPC call with the default network plane again and verify that // there are no new connections. - ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName)); + ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName)); ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics)); ASSERT_EQ(0, metrics.total_client_connections_); ASSERT_EQ(2, metrics.total_server_connections_); @@ -870,18 +870,18 @@ TEST_P(TestRpc, TestRpcSidecar) { GenericCalculatorService::static_service_name()); // Test a zero-length sidecar - DoTestSidecar(p, 0, 0); + DoTestSidecar(&p, 0, 0); // Test some small sidecars - DoTestSidecar(p, 123, 456); + DoTestSidecar(&p, 123, 456); // Test some larger sidecars to verify that we properly handle the case where // we can't write the whole response to the socket in a single call. - DoTestSidecar(p, 3000 * 1024, 2000 * 1024); + DoTestSidecar(&p, 3000 * 1024, 2000 * 1024); - DoTestOutgoingSidecarExpectOK(p, 0, 0); - DoTestOutgoingSidecarExpectOK(p, 123, 456); - DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024); + DoTestOutgoingSidecarExpectOK(&p, 0, 0); + DoTestOutgoingSidecarExpectOK(&p, 123, 456); + DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024); } // Test sending the maximum number of sidecars, each of them being a single @@ -903,7 +903,7 @@ TEST_P(TestRpc, TestMaxSmallSidecars) { for (auto& s : strings) { s = RandomString(2, &rng); } - ASSERT_OK(DoTestOutgoingSidecar(p, strings)); + ASSERT_OK(DoTestOutgoingSidecar(&p, strings)); } TEST_P(TestRpc, TestRpcSidecarLimits) { @@ -1008,15 +1008,15 @@ TEST_P(TestRpc, TestCallTimeout) { // Test a very short timeout - we expect this will time out while the // call is still trying to connect, or in the send queue. This was triggering ASAN failures // before. - NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1))); + NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromNanoseconds(1))); // Test a longer timeout - expect this will time out after we send the request, // but shorter than our threshold for two-stage timeout handling. - NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(200))); + NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(200))); // Test a longer timeout - expect this will trigger the "two-stage timeout" // code path. - NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1500))); + NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1500))); } // Inject 500ms delay in negotiation, and send a call with a short timeout, followed by @@ -1035,8 +1035,8 @@ TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) { GenericCalculatorService::static_service_name()); FLAGS_rpc_negotiation_inject_delay_ms = 500; - NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50))); - ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName)); + NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(50))); + ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName)); // Only the second call should have been received by the server, because we // don't bother sending an already-timed-out call. @@ -1079,7 +1079,7 @@ TEST_F(TestRpc, TestNegotiationTimeout) { bool is_negotiation_error = false; NO_FATALS(DoTestExpectTimeout( - p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error)); + &p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error)); EXPECT_TRUE(is_negotiation_error); } @@ -1374,14 +1374,14 @@ TEST_P(TestRpc, TestCancellation) { case OutboundCall::ON_OUTBOUND_QUEUE: case OutboundCall::SENDING: case OutboundCall::SENT: - ASSERT_TRUE(DoTestOutgoingSidecar(p, 0, 0).IsAborted()); - ASSERT_TRUE(DoTestOutgoingSidecar(p, 123, 456).IsAborted()); - ASSERT_TRUE(DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024).IsAborted()); - DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(timeout_ms), true); + ASSERT_TRUE(DoTestOutgoingSidecar(&p, 0, 0).IsAborted()); + ASSERT_TRUE(DoTestOutgoingSidecar(&p, 123, 456).IsAborted()); + ASSERT_TRUE(DoTestOutgoingSidecar(&p, 3000 * 1024, 2000 * 1024).IsAborted()); + DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(timeout_ms), true); break; case OutboundCall::NEGOTIATION_TIMED_OUT: case OutboundCall::TIMED_OUT: - DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1000)); + DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1000)); break; case OutboundCall::CANCELLED: break; @@ -1399,9 +1399,9 @@ TEST_P(TestRpc, TestCancellation) { break; } case OutboundCall::FINISHED_SUCCESS: - DoTestOutgoingSidecarExpectOK(p, 0, 0); - DoTestOutgoingSidecarExpectOK(p, 123, 456); - DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024); + DoTestOutgoingSidecarExpectOK(&p, 0, 0); + DoTestOutgoingSidecarExpectOK(&p, 123, 456); + DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024); break; } } @@ -1568,7 +1568,7 @@ TEST_P(TestRpc, TestPerformanceBySocketType) { Stopwatch sw(Stopwatch::ALL_THREADS); sw.start(); for (int i = 0; i < kNumMb / kMbPerRpc; i++) { - DoTestOutgoingSidecar(p, sidecars); + DoTestOutgoingSidecar(&p, sidecars); } sw.stop(); LOG(INFO) << strings::Substitute( diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc index 59c7667..d446227 100644 --- a/src/kudu/rpc/rpc_stub-test.cc +++ b/src/kudu/rpc/rpc_stub-test.cc @@ -357,7 +357,7 @@ TEST_F(RpcStubTest, TestCallMissingMethod) { Proxy p(client_messenger_, server_addr_, server_addr_.host(), CalculatorService::static_service_name()); - Status s = DoTestSyncCall(p, "DoesNotExist"); + Status s = DoTestSyncCall(&p, "DoesNotExist"); ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist"); } diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc index 7653617..d34b8ec 100644 --- a/src/kudu/util/net/sockaddr.cc +++ b/src/kudu/util/net/sockaddr.cc @@ -227,6 +227,9 @@ const struct sockaddr_in& Sockaddr::ipv4_addr() const { } std::string Sockaddr::ToString() const { + if (!is_initialized()) { + return "<uninitialized>"; + } switch (family()) { case AF_INET: return Substitute("$0:$1", host(), port()); diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h index e473c8b..6fb9e59 100644 --- a/src/kudu/util/net/sockaddr.h +++ b/src/kudu/util/net/sockaddr.h @@ -23,7 +23,7 @@ #include <cstdint> #include <string> -#include <type_traits> +#include <utility> #include <vector> #include <glog/logging.h>
