This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3884a6388b2696a826b8903144ae555faa595473
Author: Andrew Wong <[email protected]>
AuthorDate: Sat Sep 18 00:50:39 2021 -0700

    [consensus] KUDU-1620: re-resolve consensus peers on network error
    
    This plumbs the work from KUDU-75 into the long-lived consensus proxy,
    allowing Raft peers to re-resolve on error.
    
    This has the knock-on effect that masters starting up also re-resolve
    other masters' address when attempting to fetch UUIDs, since this
    process also uses consensus proxies.
    
    Change-Id: Ibd1b68c3c14d7d8f81168e16fe450d2ffcce840b
    Reviewed-on: http://gerrit.cloudera.org:8080/17868
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/consensus_peers.cc              |  15 +-
 src/kudu/integration-tests/dns_alias-itest.cc      | 178 +++++++++++++++++++++
 .../integration-tests/tablet_replacement-itest.cc  |   8 +-
 src/kudu/mini-cluster/external_mini_cluster.h      |   6 +
 src/kudu/mini-cluster/mini_cluster.cc              |   5 +-
 src/kudu/mini-cluster/mini_cluster.h               |   4 +-
 src/kudu/rpc/proxy.cc                              |   4 +
 src/kudu/util/net/net_util.cc                      |   8 +-
 8 files changed, 209 insertions(+), 19 deletions(-)

diff --git a/src/kudu/consensus/consensus_peers.cc 
b/src/kudu/consensus/consensus_peers.cc
index 9e1b9fe..27dd2a1 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -48,9 +48,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
-#include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
@@ -321,7 +319,8 @@ void Peer::StartElection() {
   // TODO(adar): lack of C++14 move capture makes for ugly code.
   RunLeaderElectionResponsePB* resp = resp_uniq.release();
   RpcController* controller = controller_uniq.release();
-  proxy_->StartElectionAsync(req, resp, controller, [resp, controller, 
peer_uuid]() {
+  auto s_this = shared_from_this();
+  proxy_->StartElectionAsync(req, resp, controller, [resp, controller, 
peer_uuid, s_this]() {
       unique_ptr<RunLeaderElectionResponsePB> r(resp);
       unique_ptr<RpcController> c(controller);
       string error_msg = Substitute("unable to start election on peer $0", 
peer_uuid);
@@ -588,14 +587,8 @@ Status CreateConsensusServiceProxyForHost(
     const shared_ptr<Messenger>& messenger,
     DnsResolver* dns_resolver,
     unique_ptr<ConsensusServiceProxy>* new_proxy) {
-  vector<Sockaddr> addrs;
-  RETURN_NOT_OK(dns_resolver->ResolveAddresses(hostport, &addrs));
-  if (addrs.size() > 1) {
-    LOG(WARNING) << Substitute(
-        "Peer address '$0' resolves to $1 different addresses. "
-        "Using $2", hostport.ToString(), addrs.size(), addrs[0].ToString());
-  }
-  new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0], 
hostport.host()));
+  new_proxy->reset(new ConsensusServiceProxy(messenger, hostport, 
dns_resolver));
+  (*new_proxy)->Init();
   return Status::OK();
 }
 
diff --git a/src/kudu/integration-tests/dns_alias-itest.cc 
b/src/kudu/integration-tests/dns_alias-itest.cc
index 0e92cc3..a801005 100644
--- a/src/kudu/integration-tests/dns_alias-itest.cc
+++ b/src/kudu/integration-tests/dns_alias-itest.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 #include <utility>
@@ -26,15 +27,19 @@
 
 #include "kudu/client/client.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -48,6 +53,7 @@ METRIC_DECLARE_entity(server);
 
 using kudu::client::KuduClient;
 using kudu::client::KuduTabletServer;
+using kudu::cluster::ExternalDaemon;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using std::string;
@@ -89,6 +95,73 @@ class DnsAliasITest : public KuduTest {
   void TearDown() override {
     NO_FATALS(cluster_->AssertNoCrashes());
   }
+
+  // Get the new DNS override string when restarting the last node of the given
+  // daemon type with the given reserved address.
+  string GetNewOverridesFlag(ExternalMiniCluster::DaemonType node_type,
+                             const Sockaddr& new_addr) {
+    int master_end_idx = cluster_->num_masters();
+    int tserver_end_idx = cluster_->num_tablet_servers();
+    bool is_master = node_type == ExternalMiniCluster::DaemonType::MASTER;
+    if (is_master) {
+      --master_end_idx;
+    } else {
+      --tserver_end_idx;
+    }
+    vector<string> new_overrides;
+    new_overrides.reserve(cluster_->num_masters() + 
cluster_->num_tablet_servers());
+    for (int i = 0; i < master_end_idx; i++) {
+      new_overrides.emplace_back(Substitute("$0.$1=$2", kMasterHostPrefix, i,
+                                            
cluster_->master(i)->bound_rpc_addr().ToString()));
+    }
+    for (int i = 0; i < tserver_end_idx; i++) {
+      new_overrides.emplace_back(
+          Substitute("$0.$1=$2", kTServerHostPrefix, i,
+                     cluster_->tablet_server(i)->bound_rpc_addr().ToString()));
+    }
+    new_overrides.emplace_back(
+        Substitute("$0.$1=$2", is_master ? kMasterHostPrefix : 
kTServerHostPrefix,
+                   is_master ? master_end_idx : tserver_end_idx,
+                   new_addr.ToString()));
+    return JoinStrings(new_overrides, ",");
+  }
+
+  // Adds the appropriate flags for the given daemon to be restarted bound to
+  // the given address.
+  void SetUpDaemonForNewAddr(const Sockaddr& new_addr, const string& 
new_overrides_str,
+                             ExternalDaemon* daemon) {
+    HostPort new_ip_hp(new_addr.host(), new_addr.port());
+    daemon->SetRpcBindAddress(new_ip_hp);
+    daemon->mutable_flags()->emplace_back("--rpc_reuseport=true");
+    // TODO(awong): more plumbing is needed to allow the server to startup with
+    // the webserver, so just disable it.
+    daemon->mutable_flags()->emplace_back("--webserver_enabled=false");
+    daemon->mutable_flags()->emplace_back(
+        Substitute("--dns_addr_resolution_override=$0", new_overrides_str));
+  }
+
+  // Sets the flags on all nodes in the cluster, except for the last node of
+  // the given 'node_type', which is expected to have been restarted with the
+  // appropriate flags.
+  void SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType node_type,
+                                  const string& new_overrides_str) {
+    int master_end_idx = cluster_->num_masters();
+    int tserver_end_idx = cluster_->num_tablet_servers();
+    if (node_type == ExternalMiniCluster::DaemonType::MASTER) {
+      --master_end_idx;
+    } else {
+      --tserver_end_idx;
+    }
+    for (int i = 0; i < master_end_idx; i++) {
+      ASSERT_OK(cluster_->SetFlag(
+          cluster_->master(i), "dns_addr_resolution_override", 
new_overrides_str));
+    }
+    for (int i = 0; i < tserver_end_idx; i++) {
+      ASSERT_OK(cluster_->SetFlag(
+          cluster_->tablet_server(i), "dns_addr_resolution_override", 
new_overrides_str));
+    }
+  }
+
  protected:
   unique_ptr<ExternalMiniCluster> cluster_;
   client::sp::shared_ptr<KuduClient> client_;
@@ -180,5 +253,110 @@ TEST_F(DnsAliasWithUnixSocketsITest, TestBasic) {
   }
 }
 
+// These tests depend on restarted servers being assigned a new IP address. On
+// MacOS, tservers are all assigned the same address, so don't run them there.
+#if defined(__linux__)
+
+// Regression test for KUDU-1620, wherein consensus proxies don't eventually
+// succeed when the address changes but the host/ports stays the same.
+TEST_F(DnsAliasITest, Kudu1620) {
+  TestWorkload w(cluster_.get());
+  w.set_num_replicas(3);
+  w.set_num_write_threads(1);
+  w.Setup();
+  w.Start();
+  while (w.rows_inserted() < 10) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  w.StopAndJoin();
+
+  // Shut down a tablet server and start one up at a different IP.
+  auto* tserver = cluster_->tablet_server(cluster_->num_tablet_servers() - 1);
+  tserver->Shutdown();
+  unique_ptr<Socket> reserved_socket;
+  
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::TSERVER,
 3,
+                                          kDefaultBindMode, &reserved_socket,
+                                          
tserver->bound_rpc_hostport().port()));
+  Sockaddr new_addr;
+  ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
+
+  // Once we start having the other servers communicate with the new tserver,
+  // ksck should return healthy.
+  auto new_overrides_str = 
GetNewOverridesFlag(ExternalMiniCluster::DaemonType::TSERVER, new_addr);
+  SetUpDaemonForNewAddr(new_addr, new_overrides_str, tserver);
+  ASSERT_OK(tserver->Restart());
+
+  // Running ksck should fail because the existing servers are still trying to
+  // communicate with the old port.
+  ClusterVerifier v(cluster_.get());
+  Status s = v.RunKsck();
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+
+  SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::TSERVER, 
new_overrides_str);
+
+  // Our test thread still thinks the old alias is still valid, so our ksck
+  // should fail.
+  s = v.RunKsck();
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+
+  // Once we set the DNS aliases in the test thread, ksck should succeed.
+  FLAGS_dns_addr_resolution_override = new_overrides_str;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(v.RunKsck());
+  });
+}
+
+// Master-side regression test for KUDU-1620. Masters instantiate consensus
+// proxies to get the UUIDs of its peers. With KUDU-1620 resolved, the proxy
+// used should be able to re-resolve and retry upon failure, rather than
+// retrying at the same address.
+TEST_F(DnsAliasITest, TestMasterReresolveOnStartup) {
+  const int last_master_idx = cluster_->num_masters() - 1;
+  auto* master = cluster_->master(last_master_idx);
+
+  // Shut down and prepare the node that we're going to give a new address.
+  master->Shutdown();
+  unique_ptr<Socket> reserved_socket;
+  
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::MASTER,
 3,
+                                          kDefaultBindMode, &reserved_socket,
+                                          
master->bound_rpc_hostport().port()));
+  Sockaddr new_addr;
+  ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
+  auto new_overrides_str = 
GetNewOverridesFlag(ExternalMiniCluster::DaemonType::MASTER, new_addr);
+  SetUpDaemonForNewAddr(new_addr, new_overrides_str, master);
+
+  // Shut down the other masters so we can test what happens when they come
+  // back up.
+  for (int i = 0; i < last_master_idx; i++) {
+    cluster_->master(i)->Shutdown();
+  }
+  for (int i = 0; i < last_master_idx; i++) {
+    ASSERT_OK(cluster_->master(i)->Restart());
+  }
+  // Since the rest of the cluster doesn't know about the address, ksck will
+  // fail.
+  ClusterVerifier v(cluster_.get());
+  Status s = v.RunKsck();
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+
+  // Even upon setting the DNS overrides on the rest of the nodes, since the
+  // master hasn't started, we should still see an error.
+  SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::MASTER, 
new_overrides_str);
+  s = v.RunKsck();
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  FLAGS_dns_addr_resolution_override = new_overrides_str;
+  s = v.RunKsck();
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+
+  // Upon restarting the node, the other masters should be able to resolve and
+  // connect to it.
+  ASSERT_OK(master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(v.RunKsck());
+  });
+}
+
+#endif
+
 } // namespace itest
 } // namespace kudu
diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc 
b/src/kudu/integration-tests/tablet_replacement-itest.cc
index 65ca217..bc9b367 100644
--- a/src/kudu/integration-tests/tablet_replacement-itest.cc
+++ b/src/kudu/integration-tests/tablet_replacement-itest.cc
@@ -30,10 +30,10 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -237,8 +237,10 @@ void 
TabletReplacementITest::TestDontEvictIfRemainingConfigIsUnstable(
     consensus::ConsensusStatePB cstate;
     ASSERT_OK(GetConsensusState(leader_ts, tablet_id, kTimeout, 
EXCLUDE_HEALTH_REPORT, &cstate));
     SCOPED_TRACE(cstate.DebugString());
-    ASSERT_FALSE(cstate.has_pending_config())
-        << "Leader should not have issued any config change";
+    // It's possible the leader only registered one replica as failed when
+    // sending its report to the master, so the master may have requested a
+    // change config request to add non-voter. Regardless, there should be no
+    // new committed config since a majority is down.
     ASSERT_EQ(cstate_initial.committed_config().opid_index(),
               cstate.committed_config().opid_index())
         << "Leader should not have issued any config change";
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index 0a2e830..971727b 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -669,6 +669,12 @@ class ExternalDaemon : public 
RefCountedThreadSafe<ExternalDaemon> {
   // Return the options used to create the daemon.
   ExternalDaemonOptions opts() const { return opts_; }
 
+  void SetRpcBindAddress(HostPort rpc_hostport) {
+    DCHECK(!IsProcessAlive());
+    bound_rpc_ = std::move(rpc_hostport);
+    opts_.rpc_bind_address = bound_rpc_;
+  }
+
  protected:
   friend class RefCountedThreadSafe<ExternalDaemon>;
   virtual ~ExternalDaemon();
diff --git a/src/kudu/mini-cluster/mini_cluster.cc 
b/src/kudu/mini-cluster/mini_cluster.cc
index 3a48cda..7da352b 100644
--- a/src/kudu/mini-cluster/mini_cluster.cc
+++ b/src/kudu/mini-cluster/mini_cluster.cc
@@ -60,10 +60,11 @@ string MiniCluster::GetBindIpForDaemonWithType(DaemonType 
type,
 Status MiniCluster::ReserveDaemonSocket(DaemonType type,
                                         int index,
                                         BindMode bind_mode,
-                                        unique_ptr<Socket>* socket) {
+                                        unique_ptr<Socket>* socket,
+                                        uint16_t port) {
   string ip = GetBindIpForDaemonWithType(type, index, bind_mode);
   Sockaddr sock_addr;
-  RETURN_NOT_OK(sock_addr.ParseString(ip, 0));
+  RETURN_NOT_OK(sock_addr.ParseString(ip, port));
 
   unique_ptr<Socket> sock(new Socket());
   RETURN_NOT_OK(sock->Init(sock_addr.family(), 0));
diff --git a/src/kudu/mini-cluster/mini_cluster.h 
b/src/kudu/mini-cluster/mini_cluster.h
index 945a915..dd8e964 100644
--- a/src/kudu/mini-cluster/mini_cluster.h
+++ b/src/kudu/mini-cluster/mini_cluster.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
@@ -140,7 +141,8 @@ class MiniCluster {
   static Status ReserveDaemonSocket(DaemonType type,
                                     int index,
                                     BindMode bind_mode,
-                                    std::unique_ptr<Socket>* socket);
+                                    std::unique_ptr<Socket>* socket,
+                                    uint16_t port = 0);
 
  protected:
   // Return the IP address that the daemon with the given type and index will
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index fbda5de..9501a38 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -34,6 +34,7 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -227,6 +228,9 @@ void Proxy::AsyncRequest(const string& method,
     // TODO(awong): we should be more specific here -- consider having the RPC
     // layer set a flag in the controller that warrants a retry.
     if (PREDICT_FALSE(!controller->status().ok())) {
+      KLOG_EVERY_N_SECS(WARNING, 5)
+          << Substitute("Call had error, refreshing address and retrying: $0",
+                        controller->status().ToString());
       auto req_payload = controller->ReleaseRequestPayload();
       controller->Reset();
       RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, 
controller, callback);
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index f587041..54f577e 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -220,8 +220,12 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* 
addresses) const {
   TRACE_EVENT1("net", "HostPort::ResolveAddresses",
                "host", host_);
   TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
-  if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) {
-    vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, 
",");
+  // NOTE: we use this instead of the FLAGS_... variant because this flag may 
be
+  // changed at runtime in tests and thus needs to be thread-safe.
+  const auto dns_addr_resolution_override_flag =
+      google::GetCommandLineFlagInfoOrDie("dns_addr_resolution_override");
+  if (PREDICT_FALSE(!dns_addr_resolution_override_flag.current_value.empty())) 
{
+    vector<string> hosts_and_addrs = 
Split(dns_addr_resolution_override_flag.current_value, ",");
     for (const auto& ha : hosts_and_addrs) {
       vector<string> host_and_addr = Split(ha, "=");
       if (host_and_addr.size() != 2) {

Reply via email to