This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 3884a6388b2696a826b8903144ae555faa595473 Author: Andrew Wong <[email protected]> AuthorDate: Sat Sep 18 00:50:39 2021 -0700 [consensus] KUDU-1620: re-resolve consensus peers on network error This plumbs the work from KUDU-75 into the long-lived consensus proxy, allowing Raft peers to re-resolve on error. This has the knock-on effect that masters starting up also re-resolve other masters' address when attempting to fetch UUIDs, since this process also uses consensus proxies. Change-Id: Ibd1b68c3c14d7d8f81168e16fe450d2ffcce840b Reviewed-on: http://gerrit.cloudera.org:8080/17868 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/consensus/consensus_peers.cc | 15 +- src/kudu/integration-tests/dns_alias-itest.cc | 178 +++++++++++++++++++++ .../integration-tests/tablet_replacement-itest.cc | 8 +- src/kudu/mini-cluster/external_mini_cluster.h | 6 + src/kudu/mini-cluster/mini_cluster.cc | 5 +- src/kudu/mini-cluster/mini_cluster.h | 4 +- src/kudu/rpc/proxy.cc | 4 + src/kudu/util/net/net_util.cc | 8 +- 8 files changed, 209 insertions(+), 19 deletions(-) diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc index 9e1b9fe..27dd2a1 100644 --- a/src/kudu/consensus/consensus_peers.cc +++ b/src/kudu/consensus/consensus_peers.cc @@ -48,9 +48,7 @@ #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/monotime.h" -#include "kudu/util/net/dns_resolver.h" #include "kudu/util/net/net_util.h" -#include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" #include "kudu/util/status.h" #include "kudu/util/threadpool.h" @@ -321,7 +319,8 @@ void Peer::StartElection() { // TODO(adar): lack of C++14 move capture makes for ugly code. RunLeaderElectionResponsePB* resp = resp_uniq.release(); RpcController* controller = controller_uniq.release(); - proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid]() { + auto s_this = shared_from_this(); + proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid, s_this]() { unique_ptr<RunLeaderElectionResponsePB> r(resp); unique_ptr<RpcController> c(controller); string error_msg = Substitute("unable to start election on peer $0", peer_uuid); @@ -588,14 +587,8 @@ Status CreateConsensusServiceProxyForHost( const shared_ptr<Messenger>& messenger, DnsResolver* dns_resolver, unique_ptr<ConsensusServiceProxy>* new_proxy) { - vector<Sockaddr> addrs; - RETURN_NOT_OK(dns_resolver->ResolveAddresses(hostport, &addrs)); - if (addrs.size() > 1) { - LOG(WARNING) << Substitute( - "Peer address '$0' resolves to $1 different addresses. " - "Using $2", hostport.ToString(), addrs.size(), addrs[0].ToString()); - } - new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0], hostport.host())); + new_proxy->reset(new ConsensusServiceProxy(messenger, hostport, dns_resolver)); + (*new_proxy)->Init(); return Status::OK(); } diff --git a/src/kudu/integration-tests/dns_alias-itest.cc b/src/kudu/integration-tests/dns_alias-itest.cc index 0e92cc3..a801005 100644 --- a/src/kudu/integration-tests/dns_alias-itest.cc +++ b/src/kudu/integration-tests/dns_alias-itest.cc @@ -16,6 +16,7 @@ // under the License. #include <cstdint> +#include <functional> #include <memory> #include <string> #include <utility> @@ -26,15 +27,19 @@ #include "kudu/client/client.h" #include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/integration-tests/cluster_itest_util.h" +#include "kudu/integration-tests/cluster_verifier.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/external_mini_cluster.h" +#include "kudu/mini-cluster/mini_cluster.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -48,6 +53,7 @@ METRIC_DECLARE_entity(server); using kudu::client::KuduClient; using kudu::client::KuduTabletServer; +using kudu::cluster::ExternalDaemon; using kudu::cluster::ExternalMiniCluster; using kudu::cluster::ExternalMiniClusterOptions; using std::string; @@ -89,6 +95,73 @@ class DnsAliasITest : public KuduTest { void TearDown() override { NO_FATALS(cluster_->AssertNoCrashes()); } + + // Get the new DNS override string when restarting the last node of the given + // daemon type with the given reserved address. + string GetNewOverridesFlag(ExternalMiniCluster::DaemonType node_type, + const Sockaddr& new_addr) { + int master_end_idx = cluster_->num_masters(); + int tserver_end_idx = cluster_->num_tablet_servers(); + bool is_master = node_type == ExternalMiniCluster::DaemonType::MASTER; + if (is_master) { + --master_end_idx; + } else { + --tserver_end_idx; + } + vector<string> new_overrides; + new_overrides.reserve(cluster_->num_masters() + cluster_->num_tablet_servers()); + for (int i = 0; i < master_end_idx; i++) { + new_overrides.emplace_back(Substitute("$0.$1=$2", kMasterHostPrefix, i, + cluster_->master(i)->bound_rpc_addr().ToString())); + } + for (int i = 0; i < tserver_end_idx; i++) { + new_overrides.emplace_back( + Substitute("$0.$1=$2", kTServerHostPrefix, i, + cluster_->tablet_server(i)->bound_rpc_addr().ToString())); + } + new_overrides.emplace_back( + Substitute("$0.$1=$2", is_master ? kMasterHostPrefix : kTServerHostPrefix, + is_master ? master_end_idx : tserver_end_idx, + new_addr.ToString())); + return JoinStrings(new_overrides, ","); + } + + // Adds the appropriate flags for the given daemon to be restarted bound to + // the given address. + void SetUpDaemonForNewAddr(const Sockaddr& new_addr, const string& new_overrides_str, + ExternalDaemon* daemon) { + HostPort new_ip_hp(new_addr.host(), new_addr.port()); + daemon->SetRpcBindAddress(new_ip_hp); + daemon->mutable_flags()->emplace_back("--rpc_reuseport=true"); + // TODO(awong): more plumbing is needed to allow the server to startup with + // the webserver, so just disable it. + daemon->mutable_flags()->emplace_back("--webserver_enabled=false"); + daemon->mutable_flags()->emplace_back( + Substitute("--dns_addr_resolution_override=$0", new_overrides_str)); + } + + // Sets the flags on all nodes in the cluster, except for the last node of + // the given 'node_type', which is expected to have been restarted with the + // appropriate flags. + void SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType node_type, + const string& new_overrides_str) { + int master_end_idx = cluster_->num_masters(); + int tserver_end_idx = cluster_->num_tablet_servers(); + if (node_type == ExternalMiniCluster::DaemonType::MASTER) { + --master_end_idx; + } else { + --tserver_end_idx; + } + for (int i = 0; i < master_end_idx; i++) { + ASSERT_OK(cluster_->SetFlag( + cluster_->master(i), "dns_addr_resolution_override", new_overrides_str)); + } + for (int i = 0; i < tserver_end_idx; i++) { + ASSERT_OK(cluster_->SetFlag( + cluster_->tablet_server(i), "dns_addr_resolution_override", new_overrides_str)); + } + } + protected: unique_ptr<ExternalMiniCluster> cluster_; client::sp::shared_ptr<KuduClient> client_; @@ -180,5 +253,110 @@ TEST_F(DnsAliasWithUnixSocketsITest, TestBasic) { } } +// These tests depend on restarted servers being assigned a new IP address. On +// MacOS, tservers are all assigned the same address, so don't run them there. +#if defined(__linux__) + +// Regression test for KUDU-1620, wherein consensus proxies don't eventually +// succeed when the address changes but the host/ports stays the same. +TEST_F(DnsAliasITest, Kudu1620) { + TestWorkload w(cluster_.get()); + w.set_num_replicas(3); + w.set_num_write_threads(1); + w.Setup(); + w.Start(); + while (w.rows_inserted() < 10) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + w.StopAndJoin(); + + // Shut down a tablet server and start one up at a different IP. + auto* tserver = cluster_->tablet_server(cluster_->num_tablet_servers() - 1); + tserver->Shutdown(); + unique_ptr<Socket> reserved_socket; + ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::TSERVER, 3, + kDefaultBindMode, &reserved_socket, + tserver->bound_rpc_hostport().port())); + Sockaddr new_addr; + ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr)); + + // Once we start having the other servers communicate with the new tserver, + // ksck should return healthy. + auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::TSERVER, new_addr); + SetUpDaemonForNewAddr(new_addr, new_overrides_str, tserver); + ASSERT_OK(tserver->Restart()); + + // Running ksck should fail because the existing servers are still trying to + // communicate with the old port. + ClusterVerifier v(cluster_.get()); + Status s = v.RunKsck(); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + + SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::TSERVER, new_overrides_str); + + // Our test thread still thinks the old alias is still valid, so our ksck + // should fail. + s = v.RunKsck(); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + + // Once we set the DNS aliases in the test thread, ksck should succeed. + FLAGS_dns_addr_resolution_override = new_overrides_str; + ASSERT_EVENTUALLY([&] { + ASSERT_OK(v.RunKsck()); + }); +} + +// Master-side regression test for KUDU-1620. Masters instantiate consensus +// proxies to get the UUIDs of its peers. With KUDU-1620 resolved, the proxy +// used should be able to re-resolve and retry upon failure, rather than +// retrying at the same address. +TEST_F(DnsAliasITest, TestMasterReresolveOnStartup) { + const int last_master_idx = cluster_->num_masters() - 1; + auto* master = cluster_->master(last_master_idx); + + // Shut down and prepare the node that we're going to give a new address. + master->Shutdown(); + unique_ptr<Socket> reserved_socket; + ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::MASTER, 3, + kDefaultBindMode, &reserved_socket, + master->bound_rpc_hostport().port())); + Sockaddr new_addr; + ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr)); + auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::MASTER, new_addr); + SetUpDaemonForNewAddr(new_addr, new_overrides_str, master); + + // Shut down the other masters so we can test what happens when they come + // back up. + for (int i = 0; i < last_master_idx; i++) { + cluster_->master(i)->Shutdown(); + } + for (int i = 0; i < last_master_idx; i++) { + ASSERT_OK(cluster_->master(i)->Restart()); + } + // Since the rest of the cluster doesn't know about the address, ksck will + // fail. + ClusterVerifier v(cluster_.get()); + Status s = v.RunKsck(); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + + // Even upon setting the DNS overrides on the rest of the nodes, since the + // master hasn't started, we should still see an error. + SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::MASTER, new_overrides_str); + s = v.RunKsck(); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + FLAGS_dns_addr_resolution_override = new_overrides_str; + s = v.RunKsck(); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + + // Upon restarting the node, the other masters should be able to resolve and + // connect to it. + ASSERT_OK(master->Restart()); + ASSERT_EVENTUALLY([&] { + ASSERT_OK(v.RunKsck()); + }); +} + +#endif + } // namespace itest } // namespace kudu diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc b/src/kudu/integration-tests/tablet_replacement-itest.cc index 65ca217..bc9b367 100644 --- a/src/kudu/integration-tests/tablet_replacement-itest.cc +++ b/src/kudu/integration-tests/tablet_replacement-itest.cc @@ -30,10 +30,10 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "kudu/common/row_operations.pb.h" #include "kudu/common/schema.h" #include "kudu/common/wire_protocol-test-util.h" #include "kudu/common/wire_protocol.h" -#include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/strings/substitute.h" @@ -237,8 +237,10 @@ void TabletReplacementITest::TestDontEvictIfRemainingConfigIsUnstable( consensus::ConsensusStatePB cstate; ASSERT_OK(GetConsensusState(leader_ts, tablet_id, kTimeout, EXCLUDE_HEALTH_REPORT, &cstate)); SCOPED_TRACE(cstate.DebugString()); - ASSERT_FALSE(cstate.has_pending_config()) - << "Leader should not have issued any config change"; + // It's possible the leader only registered one replica as failed when + // sending its report to the master, so the master may have requested a + // change config request to add non-voter. Regardless, there should be no + // new committed config since a majority is down. ASSERT_EQ(cstate_initial.committed_config().opid_index(), cstate.committed_config().opid_index()) << "Leader should not have issued any config change"; diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h index 0a2e830..971727b 100644 --- a/src/kudu/mini-cluster/external_mini_cluster.h +++ b/src/kudu/mini-cluster/external_mini_cluster.h @@ -669,6 +669,12 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> { // Return the options used to create the daemon. ExternalDaemonOptions opts() const { return opts_; } + void SetRpcBindAddress(HostPort rpc_hostport) { + DCHECK(!IsProcessAlive()); + bound_rpc_ = std::move(rpc_hostport); + opts_.rpc_bind_address = bound_rpc_; + } + protected: friend class RefCountedThreadSafe<ExternalDaemon>; virtual ~ExternalDaemon(); diff --git a/src/kudu/mini-cluster/mini_cluster.cc b/src/kudu/mini-cluster/mini_cluster.cc index 3a48cda..7da352b 100644 --- a/src/kudu/mini-cluster/mini_cluster.cc +++ b/src/kudu/mini-cluster/mini_cluster.cc @@ -60,10 +60,11 @@ string MiniCluster::GetBindIpForDaemonWithType(DaemonType type, Status MiniCluster::ReserveDaemonSocket(DaemonType type, int index, BindMode bind_mode, - unique_ptr<Socket>* socket) { + unique_ptr<Socket>* socket, + uint16_t port) { string ip = GetBindIpForDaemonWithType(type, index, bind_mode); Sockaddr sock_addr; - RETURN_NOT_OK(sock_addr.ParseString(ip, 0)); + RETURN_NOT_OK(sock_addr.ParseString(ip, port)); unique_ptr<Socket> sock(new Socket()); RETURN_NOT_OK(sock->Init(sock_addr.family(), 0)); diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h index 945a915..dd8e964 100644 --- a/src/kudu/mini-cluster/mini_cluster.h +++ b/src/kudu/mini-cluster/mini_cluster.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <cstdint> #include <memory> #include <string> #include <vector> @@ -140,7 +141,8 @@ class MiniCluster { static Status ReserveDaemonSocket(DaemonType type, int index, BindMode bind_mode, - std::unique_ptr<Socket>* socket); + std::unique_ptr<Socket>* socket, + uint16_t port = 0); protected: // Return the IP address that the daemon with the given type and index will diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc index fbda5de..9501a38 100644 --- a/src/kudu/rpc/proxy.cc +++ b/src/kudu/rpc/proxy.cc @@ -34,6 +34,7 @@ #include "kudu/rpc/rpc_controller.h" #include "kudu/rpc/user_credentials.h" #include "kudu/util/kernel_stack_watchdog.h" +#include "kudu/util/logging.h" #include "kudu/util/net/dns_resolver.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" @@ -227,6 +228,9 @@ void Proxy::AsyncRequest(const string& method, // TODO(awong): we should be more specific here -- consider having the RPC // layer set a flag in the controller that warrants a retry. if (PREDICT_FALSE(!controller->status().ok())) { + KLOG_EVERY_N_SECS(WARNING, 5) + << Substitute("Call had error, refreshing address and retrying: $0", + controller->status().ToString()); auto req_payload = controller->ReleaseRequestPayload(); controller->Reset(); RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback); diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc index f587041..54f577e 100644 --- a/src/kudu/util/net/net_util.cc +++ b/src/kudu/util/net/net_util.cc @@ -220,8 +220,12 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const { TRACE_EVENT1("net", "HostPort::ResolveAddresses", "host", host_); TRACE_COUNTER_SCOPE_LATENCY_US("dns_us"); - if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) { - vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, ","); + // NOTE: we use this instead of the FLAGS_... variant because this flag may be + // changed at runtime in tests and thus needs to be thread-safe. + const auto dns_addr_resolution_override_flag = + google::GetCommandLineFlagInfoOrDie("dns_addr_resolution_override"); + if (PREDICT_FALSE(!dns_addr_resolution_override_flag.current_value.empty())) { + vector<string> hosts_and_addrs = Split(dns_addr_resolution_override_flag.current_value, ","); for (const auto& ha : hosts_and_addrs) { vector<string> host_and_addr = Split(ha, "="); if (host_and_addr.size() != 2) {
