client: rename various function calls and classes to ConnectToCluster This is a straight rename patch in preparation for adding the fetching of authentication tokens and CA information upon first connection to the master.
Change-Id: I96bdfc1c6bb3758841d31d728b168bac8ac78ec0 Reviewed-on: http://gerrit.cloudera.org:8080/5868 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1ce41a4a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1ce41a4a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1ce41a4a Branch: refs/heads/master Commit: 1ce41a4a5efc19d33e02b7179a98681b1ead43af Parents: 6aba5ef Author: Todd Lipcon <[email protected]> Authored: Wed Feb 1 18:34:44 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Fri Feb 3 01:08:12 2017 +0000 ---------------------------------------------------------------------- src/kudu/client/client-internal.cc | 30 +++++++++--------- src/kudu/client/client-internal.h | 32 +++++++++----------- src/kudu/client/client.cc | 4 +-- src/kudu/client/master_rpc.cc | 32 ++++++++++---------- src/kudu/client/master_rpc.h | 24 +++++++-------- src/kudu/client/meta_cache.cc | 2 +- .../integration-tests/external_mini_cluster.cc | 18 +++++------ src/kudu/master/master.proto | 6 ++++ 8 files changed, 75 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/client-internal.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc index 176d3d7..b8afa1d 100644 --- a/src/kudu/client/client-internal.cc +++ b/src/kudu/client/client-internal.cc @@ -83,7 +83,7 @@ using strings::Substitute; namespace client { -using internal::GetLeaderMasterRpc; +using internal::ConnectToClusterRpc; using internal::RemoteTablet; using internal::RemoteTabletServer; @@ -192,7 +192,7 @@ Status KuduClient::Data::SyncLeaderMasterRpc( << s.ToString(); if (client->IsMultiMaster()) { LOG(INFO) << "Determining the new leader Master and retrying..."; - WARN_NOT_OK(SetMasterServerProxy(client, deadline), + WARN_NOT_OK(ConnectToCluster(client, deadline), "Unable to determine the new leader Master"); continue; } @@ -206,7 +206,7 @@ Status KuduClient::Data::SyncLeaderMasterRpc( << "): " << s.ToString(); if (client->IsMultiMaster()) { LOG(INFO) << "Determining the new leader Master and retrying..."; - WARN_NOT_OK(SetMasterServerProxy(client, deadline), + WARN_NOT_OK(ConnectToCluster(client, deadline), "Unable to determine the new leader Master"); } continue; @@ -222,7 +222,7 @@ Status KuduClient::Data::SyncLeaderMasterRpc( resp->error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) { if (client->IsMultiMaster()) { KLOG_EVERY_N_SECS(INFO, 1) << "Determining the new leader Master and retrying..."; - WARN_NOT_OK(SetMasterServerProxy(client, deadline), + WARN_NOT_OK(ConnectToCluster(client, deadline), "Unable to determine the new leader Master"); continue; } @@ -599,8 +599,8 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client, return Status::OK(); } -void KuduClient::Data::LeaderMasterDetermined(const Status& status, - const HostPort& host_port) { +void KuduClient::Data::ConnectedToClusterCb(const Status& status, + const HostPort& host_port) { Sockaddr leader_sock_addr; Status new_status = status; if (new_status.ok()) { @@ -624,16 +624,16 @@ void KuduClient::Data::LeaderMasterDetermined(const Status& status, } } -Status KuduClient::Data::SetMasterServerProxy(KuduClient* client, - const MonoTime& deadline) { +Status KuduClient::Data::ConnectToCluster(KuduClient* client, + const MonoTime& deadline) { Synchronizer sync; - SetMasterServerProxyAsync(client, deadline, sync.AsStatusCallback()); + ConnectToClusterAsync(client, deadline, sync.AsStatusCallback()); return sync.Wait(); } -void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client, - const MonoTime& deadline, - const StatusCallback& cb) { +void KuduClient::Data::ConnectToClusterAsync(KuduClient* client, + const MonoTime& deadline, + const StatusCallback& cb) { DCHECK(deadline.Initialized()); vector<Sockaddr> master_sockaddrs; @@ -659,7 +659,7 @@ void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client, master_sockaddrs.push_back(addrs[0]); } - // This ensures that no more than one GetLeaderMasterRpc is in + // This ensures that no more than one ConnectToClusterRpc is in // flight at a time -- there isn't much sense in requesting this information // in parallel, since the requests should end up with the same result. // Instead, we simply piggy-back onto the existing request by adding our own @@ -668,8 +668,8 @@ void KuduClient::Data::SetMasterServerProxyAsync(KuduClient* client, leader_master_callbacks_.push_back(cb); if (!leader_master_rpc_) { // No one is sending a request yet - we need to be the one to do it. - leader_master_rpc_.reset(new internal::GetLeaderMasterRpc( - Bind(&KuduClient::Data::LeaderMasterDetermined, + leader_master_rpc_.reset(new internal::ConnectToClusterRpc( + Bind(&KuduClient::Data::ConnectedToClusterCb, Unretained(this)), std::move(master_sockaddrs), deadline, http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/client-internal.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h index 1fbf15e..ca0aeba 100644 --- a/src/kudu/client/client-internal.h +++ b/src/kudu/client/client-internal.h @@ -52,7 +52,7 @@ class RpcController; namespace client { namespace internal { -class GetLeaderMasterRpc; +class ConnectToClusterRpc; } // namespace internal class KuduClient::Data { @@ -134,11 +134,10 @@ class KuduClient::Data { // Sets 'master_proxy_' from the address specified by // 'leader_master_hostport_'. Called by - // GetLeaderMasterRpc::SendRpcCb() upon successful completion. + // ConnectToClusterRpc::SendRpcCb() upon successful completion. // - // See also: SetMasterServerProxyAsync. - void LeaderMasterDetermined(const Status& status, - const HostPort& host_port); + // See also: ConnectToClusterAsync. + void ConnectedToClusterCb(const Status& status, const HostPort& host_port); // Asynchronously sets 'master_proxy_' to the leader master by // cycling through servers listed in 'master_server_addrs_' until @@ -148,18 +147,15 @@ class KuduClient::Data { // Invokes 'cb' with the appropriate status when finished. // // Works with both a distributed and non-distributed configuration. - void SetMasterServerProxyAsync(KuduClient* client, - const MonoTime& deadline, - const StatusCallback& cb); + void ConnectToClusterAsync(KuduClient* client, + const MonoTime& deadline, + const StatusCallback& cb); - // Synchronous version of SetMasterServerProxyAsync method above. + // Synchronous version of ConnectToClusterAsync method above. // // NOTE: since this uses a Synchronizer, this may not be invoked by // a method that's on a reactor thread. - // - // TODO(todd): rename to ReconnectToMasters or something - Status SetMasterServerProxy(KuduClient* client, - const MonoTime& deadline); + Status ConnectToCluster(KuduClient* client, const MonoTime& deadline); std::shared_ptr<master::MasterServiceProxy> master_proxy() const; @@ -220,23 +216,23 @@ class KuduClient::Data { MonoDelta default_rpc_timeout_; // The host port of the leader master. This is set in - // LeaderMasterDetermined, which is invoked as a callback by - // SetMasterServerProxyAsync. + // ConnectedToClusterCb, which is invoked as a callback by + // ConnectToClusterAsync. HostPort leader_master_hostport_; // Proxy to the leader master. std::shared_ptr<master::MasterServiceProxy> master_proxy_; - // Ref-counted RPC instance: since 'SetMasterServerProxyAsync' call + // Ref-counted RPC instance: since 'ConnectToClusterAsync' call // is asynchronous, we need to hold a reference in this class // itself, as to avoid a "use-after-free" scenario. - scoped_refptr<internal::GetLeaderMasterRpc> leader_master_rpc_; + scoped_refptr<internal::ConnectToClusterRpc> leader_master_rpc_; std::vector<StatusCallback> leader_master_callbacks_; // Protects 'leader_master_rpc_', 'leader_master_hostport_', // and master_proxy_ // - // See: KuduClient::Data::SetMasterServerProxyAsync for a more + // See: KuduClient::Data::ConnectToClusterAsync for a more // in-depth explanation of why this is needed and how it works. mutable simple_spinlock leader_master_lock_; http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/client.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 533e0af..99b37f2 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -244,8 +244,8 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) { // Let's allow for plenty of time for discovering the master the first // time around. MonoTime deadline = MonoTime::Now() + c->default_admin_operation_timeout(); - RETURN_NOT_OK_PREPEND(c->data_->SetMasterServerProxy(c.get(), deadline), - "Could not locate the leader master"); + RETURN_NOT_OK_PREPEND(c->data_->ConnectToCluster(c.get(), deadline), + "Could not connect to the cluster"); c->data_->meta_cache_.reset(new MetaCache(c.get())); c->data_->dns_resolver_.reset(new DnsResolver()); http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/master_rpc.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc index 4e032ec..a04cc32 100644 --- a/src/kudu/client/master_rpc.cc +++ b/src/kudu/client/master_rpc.cc @@ -143,14 +143,14 @@ void GetMasterRegistrationRpc::SendRpcCb(const Status& status) { } // anonymous namespace //////////////////////////////////////////////////////////// -// GetLeaderMasterRpc +// ConnectToClusterRpc //////////////////////////////////////////////////////////// -GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb, - vector<Sockaddr> addrs, - MonoTime deadline, - MonoDelta rpc_timeout, - shared_ptr<Messenger> messenger) +ConnectToClusterRpc::ConnectToClusterRpc(LeaderCallback user_cb, + vector<Sockaddr> addrs, + MonoTime deadline, + MonoDelta rpc_timeout, + shared_ptr<Messenger> messenger) : Rpc(deadline, std::move(messenger)), user_cb_(std::move(user_cb)), addrs_(std::move(addrs)), @@ -163,20 +163,20 @@ GetLeaderMasterRpc::GetLeaderMasterRpc(LeaderCallback user_cb, responses_.resize(addrs_.size()); } -GetLeaderMasterRpc::~GetLeaderMasterRpc() { +ConnectToClusterRpc::~ConnectToClusterRpc() { } -string GetLeaderMasterRpc::ToString() const { +string ConnectToClusterRpc::ToString() const { vector<string> sockaddr_str; for (const Sockaddr& addr : addrs_) { sockaddr_str.push_back(addr.ToString()); } - return strings::Substitute("GetLeaderMasterRpc(addrs: $0, num_attempts: $1)", + return strings::Substitute("ConnectToClusterRpc(addrs: $0, num_attempts: $1)", JoinStrings(sockaddr_str, ","), num_attempts()); } -void GetLeaderMasterRpc::SendRpc() { +void ConnectToClusterRpc::SendRpc() { // Compute the actual deadline to use for each RPC. MonoTime rpc_deadline = MonoTime::Now() + rpc_timeout_; MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(), @@ -185,7 +185,7 @@ void GetLeaderMasterRpc::SendRpc() { std::lock_guard<simple_spinlock> l(lock_); for (int i = 0; i < addrs_.size(); i++) { GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc( - Bind(&GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode, + Bind(&ConnectToClusterRpc::SingleNodeCallback, this, ConstRef(addrs_[i]), ConstRef(responses_[i])), addrs_[i], actual_deadline, @@ -196,11 +196,11 @@ void GetLeaderMasterRpc::SendRpc() { } } -void GetLeaderMasterRpc::SendRpcCb(const Status& status) { +void ConnectToClusterRpc::SendRpcCb(const Status& status) { // To safely retry, we must reset completed_ so that it can be reused in the // next round of RPCs. // - // The SendRpcCb invariant (see GetMasterRegistrationRpcCbForNode comments) + // The SendRpcCb invariant (see SingleNodeCallback comments) // implies that if we're to retry, we must be the last response. Thus, it is // safe to reset completed_ in this case; there's no danger of a late // response reading it and entering SendRpcCb inadvertently. @@ -228,9 +228,9 @@ void GetLeaderMasterRpc::SendRpcCb(const Status& status) { user_cb_.Run(status, leader_master_); } -void GetLeaderMasterRpc::GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr, - const ServerEntryPB& resp, - const Status& status) { +void ConnectToClusterRpc::SingleNodeCallback(const Sockaddr& node_addr, + const ServerEntryPB& resp, + const Status& status) { // TODO(todd): handle the situation where one Master is partitioned from // the rest of the Master consensus configuration, all are reachable by the client, // and the partitioned node "thinks" it's the leader. http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/master_rpc.h ---------------------------------------------------------------------- diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h index 9159a7f..3170693 100644 --- a/src/kudu/client/master_rpc.h +++ b/src/kudu/client/master_rpc.h @@ -54,8 +54,8 @@ namespace internal { // The class is reference counted to avoid a "use-after-free" // scenario, when responses to the RPC return to the caller _after_ a // leader has already been found. -class GetLeaderMasterRpc : public rpc::Rpc, - public RefCountedThreadSafe<GetLeaderMasterRpc> { +class ConnectToClusterRpc : public rpc::Rpc, + public RefCountedThreadSafe<ConnectToClusterRpc> { public: typedef Callback<void(const Status&, const HostPort&)> LeaderCallback; // The host and port of the leader master server is stored in @@ -65,18 +65,18 @@ class GetLeaderMasterRpc : public rpc::Rpc, // Calls 'user_cb' when the leader is found, or if no leader can be found // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete // before it times out and may be retried if 'deadline' has not yet passed. - GetLeaderMasterRpc(LeaderCallback user_cb, - std::vector<Sockaddr> addrs, - MonoTime deadline, - MonoDelta rpc_timeout, - std::shared_ptr<rpc::Messenger> messenger); + ConnectToClusterRpc(LeaderCallback user_cb, + std::vector<Sockaddr> addrs, + MonoTime deadline, + MonoDelta rpc_timeout, + std::shared_ptr<rpc::Messenger> messenger); virtual void SendRpc() OVERRIDE; virtual std::string ToString() const OVERRIDE; private: - friend class RefCountedThreadSafe<GetLeaderMasterRpc>; - ~GetLeaderMasterRpc(); + friend class RefCountedThreadSafe<ConnectToClusterRpc>; + ~ConnectToClusterRpc(); virtual void SendRpcCb(const Status& status) OVERRIDE; @@ -86,9 +86,9 @@ class GetLeaderMasterRpc : public rpc::Rpc, // Invokes SendRpcCb if the response indicates that the specified // master is a leader, or if responses have been received from all // of the Masters. - void GetMasterRegistrationRpcCbForNode(const Sockaddr& node_addr, - const ServerEntryPB& resp, - const Status& status); + void SingleNodeCallback(const Sockaddr& node_addr, + const ServerEntryPB& resp, + const Status& status); LeaderCallback user_cb_; std::vector<Sockaddr> addrs_; http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/client/meta_cache.cc ---------------------------------------------------------------------- diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc index eb40f68..9be8d64 100644 --- a/src/kudu/client/meta_cache.cc +++ b/src/kudu/client/meta_cache.cc @@ -672,7 +672,7 @@ string LookupRpc::ToString() const { } void LookupRpc::ResetMasterLeaderAndRetry() { - table_->client()->data_->SetMasterServerProxyAsync( + table_->client()->data_->ConnectToClusterAsync( table_->client(), retrier().deadline(), Bind(&LookupRpc::NewLeaderMasterDeterminedCb, http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/integration-tests/external_mini_cluster.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc index a9ee499..ff0cbae 100644 --- a/src/kudu/integration-tests/external_mini_cluster.cc +++ b/src/kudu/integration-tests/external_mini_cluster.cc @@ -51,7 +51,7 @@ #include "kudu/util/subprocess.h" #include "kudu/util/test_util.h" -using kudu::client::internal::GetLeaderMasterRpc; +using kudu::client::internal::ConnectToClusterRpc; using kudu::master::ListTablesRequestPB; using kudu::master::ListTablesResponsePB; using kudu::master::MasterServiceProxy; @@ -474,7 +474,7 @@ void LeaderMasterCallback(HostPort* dst_hostport, } // anonymous namespace Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) { - scoped_refptr<GetLeaderMasterRpc> rpc; + scoped_refptr<ConnectToClusterRpc> rpc; Synchronizer sync; vector<Sockaddr> addrs; HostPort leader_master_hp; @@ -483,13 +483,13 @@ Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) { for (const scoped_refptr<ExternalMaster>& master : masters_) { addrs.push_back(master->bound_rpc_addr()); } - rpc.reset(new GetLeaderMasterRpc(Bind(&LeaderMasterCallback, - &leader_master_hp, - &sync), - std::move(addrs), - deadline, - MonoDelta::FromSeconds(5), - messenger_)); + rpc.reset(new ConnectToClusterRpc(Bind(&LeaderMasterCallback, + &leader_master_hp, + &sync), + std::move(addrs), + deadline, + MonoDelta::FromSeconds(5), + messenger_)); rpc->SendRpc(); RETURN_NOT_OK(sync.Wait()); bool found = false; http://git-wip-us.apache.org/repos/asf/kudu/blob/1ce41a4a/src/kudu/master/master.proto ---------------------------------------------------------------------- diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index e17e73c..acdf0e9 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -627,6 +627,12 @@ service MasterService { // Administrative/monitoring RPCs rpc ListTabletServers(ListTabletServersRequestPB) returns (ListTabletServersResponsePB); rpc ListMasters(ListMastersRequestPB) returns (ListMastersResponsePB); + + // TODO(todd): rename this RPC to ConnectToCluster() or somesuch. It's only used by + // the client. However, we need to keep in mind compatibility. We'll probably have to + // do this by adding a new RPC which use the existing protos, and having the server + // implement it, but the client not send it. After it's been out for a few releases, + // we can start sending it from clients? rpc GetMasterRegistration(GetMasterRegistrationRequestPB) returns (GetMasterRegistrationResponsePB); }
