master_rpc: pass back more details from ConnectToCluster

This plumbs back the ConnectToMasterResponsePB from the ConnectToCluster
RPC. This brings us one step closer to inserting the CA cert into the
TlsContext and passing the token somewhere useful.

Along the way, I switched to std::function instead of gutil Callback
since it allows the slightly easier lambda syntax.

No new tests here since all the existing client tests cover this code
path.

Change-Id: I3b5056c9f31237249516a2e7ae68d641c9f6bd02
Reviewed-on: http://gerrit.cloudera.org:8080/5892
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aser...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e95c29b6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e95c29b6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e95c29b6

Branch: refs/heads/master
Commit: e95c29b685bcd9d3d444ba46f8977a93fc8901a2
Parents: 7d43460
Author: Todd Lipcon <t...@apache.org>
Authored: Thu Feb 2 22:45:21 2017 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Tue Feb 7 18:36:29 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc              | 35 ++++++++++----------
 src/kudu/client/client-internal.h               | 10 +++---
 src/kudu/client/master_rpc.cc                   | 16 +++++----
 src/kudu/client/master_rpc.h                    | 29 ++++++++++------
 .../integration-tests/external_mini_cluster.cc  | 29 +++++++---------
 5 files changed, 63 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e95c29b6/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc 
b/src/kudu/client/client-internal.cc
index b8afa1d..6cdd335 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -18,6 +18,7 @@
 #include "kudu/client/client-internal.h"
 
 #include <algorithm>
+#include <functional>
 #include <limits>
 #include <memory>
 #include <mutex>
@@ -599,28 +600,24 @@ Status KuduClient::Data::GetTableSchema(KuduClient* 
client,
   return Status::OK();
 }
 
-void KuduClient::Data::ConnectedToClusterCb(const Status& status,
-                                            const HostPort& host_port) {
-  Sockaddr leader_sock_addr;
-  Status new_status = status;
-  if (new_status.ok()) {
-    new_status = SockaddrFromHostPort(host_port, &leader_sock_addr);
-  }
-
+void KuduClient::Data::ConnectedToClusterCb(
+    const Status& status,
+    const Sockaddr& leader_addr,
+    const master::ConnectToMasterResponsePB& /*connect_response*/) {
   vector<StatusCallback> cbs;
   {
     std::lock_guard<simple_spinlock> l(leader_master_lock_);
     cbs.swap(leader_master_callbacks_);
     leader_master_rpc_.reset();
 
-    if (new_status.ok()) {
-      leader_master_hostport_ = host_port;
-      master_proxy_.reset(new MasterServiceProxy(messenger_, 
leader_sock_addr));
+    if (status.ok()) {
+      leader_master_hostport_ = HostPort(leader_addr);
+      master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr));
     }
   }
 
   for (const StatusCallback& cb : cbs) {
-    cb.Run(new_status);
+    cb.Run(status);
   }
 }
 
@@ -669,12 +666,14 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* 
client,
   if (!leader_master_rpc_) {
     // No one is sending a request yet - we need to be the one to do it.
     leader_master_rpc_.reset(new internal::ConnectToClusterRpc(
-                               Bind(&KuduClient::Data::ConnectedToClusterCb,
-                                    Unretained(this)),
-                               std::move(master_sockaddrs),
-                               deadline,
-                               client->default_rpc_timeout(),
-                               messenger_));
+        std::bind(&KuduClient::Data::ConnectedToClusterCb, this,
+                  std::placeholders::_1,
+                  std::placeholders::_2,
+                  std::placeholders::_3),
+        std::move(master_sockaddrs),
+        deadline,
+        client->default_rpc_timeout(),
+        messenger_));
     l.unlock();
     leader_master_rpc_->SendRpc();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e95c29b6/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h 
b/src/kudu/client/client-internal.h
index ca0aeba..6594945 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -39,6 +39,7 @@ class HostPort;
 
 namespace master {
 class AlterTableRequestPB;
+class ConnectToMasterResponsePB;
 class CreateTableRequestPB;
 class MasterServiceProxy;
 } // namespace master
@@ -132,12 +133,13 @@ class KuduClient::Data {
       const std::set<std::string>& blacklist,
       std::vector<internal::RemoteTabletServer*>* candidates) const;
 
-  // Sets 'master_proxy_' from the address specified by
-  // 'leader_master_hostport_'.  Called by
-  // ConnectToClusterRpc::SendRpcCb() upon successful completion.
+  // Sets 'master_proxy_' from the address specified by 'leader_addr'.
+  // Called by ConnectToClusterRpc::SendRpcCb() upon successful completion.
   //
   // See also: ConnectToClusterAsync.
-  void ConnectedToClusterCb(const Status& status, const HostPort& host_port);
+  void ConnectedToClusterCb(const Status& status,
+                            const Sockaddr& leader_addr,
+                            const master::ConnectToMasterResponsePB& 
connect_response);
 
   // Asynchronously sets 'master_proxy_' to the leader master by
   // cycling through servers listed in 'master_server_addrs_' until

http://git-wip-us.apache.org/repos/asf/kudu/blob/e95c29b6/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index 8c0dbd6..3d97f3d 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -228,8 +228,7 @@ void ConnectToClusterRpc::SendRpc() {
   std::lock_guard<simple_spinlock> l(lock_);
   for (int i = 0; i < addrs_.size(); i++) {
     ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
-        Bind(&ConnectToClusterRpc::SingleNodeCallback,
-             this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
+        Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
         addrs_[i],
         actual_deadline,
         retrier().messenger(),
@@ -268,12 +267,17 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) 
{
 
   // We are not retrying.
   undo_completed.cancel();
-  user_cb_.Run(status, leader_master_);
+  if (leader_idx_ != -1) {
+    user_cb_(status, addrs_[leader_idx_], responses_[leader_idx_]);
+  } else {
+    user_cb_(status, {}, {});
+  }
 }
 
-void ConnectToClusterRpc::SingleNodeCallback(const Sockaddr& node_addr,
-                                             const ConnectToMasterResponsePB& 
resp,
+void ConnectToClusterRpc::SingleNodeCallback(int master_idx,
                                              const Status& status) {
+  const ConnectToMasterResponsePB& resp = responses_[master_idx];
+
   // TODO(todd): handle the situation where one Master is partitioned from
   // the rest of the Master consensus configuration, all are reachable by the 
client,
   // and the partitioned node "thinks" it's the leader.
@@ -300,7 +304,7 @@ void ConnectToClusterRpc::SingleNodeCallback(const 
Sockaddr& node_addr,
         new_status = Status::NotFound("no leader found: " + ToString());
       } else {
         // We've found a leader.
-        leader_master_ = HostPort(node_addr);
+        leader_idx_ = master_idx;
         completed_ = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e95c29b6/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
index 7fe278d..1e55bea 100644
--- a/src/kudu/client/master_rpc.h
+++ b/src/kudu/client/master_rpc.h
@@ -18,6 +18,7 @@
 // This module is internal to the client and not a public API.
 #pragma once
 
+#include <functional>
 #include <vector>
 #include <string>
 
@@ -63,7 +64,10 @@ namespace internal {
 class ConnectToClusterRpc : public rpc::Rpc,
                             public RefCountedThreadSafe<ConnectToClusterRpc> {
  public:
-  typedef Callback<void(const Status&, const HostPort&)> LeaderCallback;
+  typedef std::function<void(
+      const Status& status,
+      const Sockaddr& leader_master,
+      const master::ConnectToMasterResponsePB& connect_response)> 
LeaderCallback;
   // The host and port of the leader master server is stored in
   // 'leader_master', which must remain valid for the lifetime of this
   // object.
@@ -86,25 +90,23 @@ class ConnectToClusterRpc : public rpc::Rpc,
 
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
-  // Invoked when a response comes back from a Master with address
-  // 'node_addr'.
+  // Invoked when a response comes back from the master with index
+  // 'master_idx'.
   //
   // Invokes SendRpcCb if the response indicates that the specified
   // master is a leader, or if responses have been received from all
   // of the Masters.
-  void SingleNodeCallback(const Sockaddr& node_addr,
-                          const master::ConnectToMasterResponsePB& resp,
-                          const Status& status);
+  void SingleNodeCallback(int master_idx, const Status& status);
 
-  LeaderCallback user_cb_;
-  std::vector<Sockaddr> addrs_;
+  const LeaderCallback user_cb_;
 
-  HostPort leader_master_;
+  // The addresses of the masters.
+  const std::vector<Sockaddr> addrs_;
 
   // The amount of time alloted to each GetMasterRegistration RPC.
-  MonoDelta rpc_timeout_;
+  const MonoDelta rpc_timeout_;
 
-  // The received responses.
+  // The received responses. The indexes correspond to 'addrs_'.
   std::vector<master::ConnectToMasterResponsePB> responses_;
 
   // Number of pending responses.
@@ -114,6 +116,11 @@ class ConnectToClusterRpc : public rpc::Rpc,
   // RPC can be deallocated.
   bool completed_;
 
+  // The index of the master that was determined to be the leader.
+  // This corresponds to entries in 'responses_' and 'addrs_'.
+  // -1 indicates no leader found.
+  int leader_idx_ = -1;
+
   // Protects 'pending_responses_' and 'completed_'.
   mutable simple_spinlock lock_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/e95c29b6/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc 
b/src/kudu/integration-tests/external_mini_cluster.cc
index ff0cbae..ef4e287 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -18,6 +18,7 @@
 #include "kudu/integration-tests/external_mini_cluster.h"
 
 #include <algorithm>
+#include <functional>
 #include <gtest/gtest.h>
 #include <memory>
 #include <rapidjson/document.h>
@@ -461,31 +462,25 @@ Status 
ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
   return Status::TimedOut(SecureDebugString(resp));
 }
 
-namespace {
-void LeaderMasterCallback(HostPort* dst_hostport,
-                          Synchronizer* sync,
-                          const Status& status,
-                          const HostPort& result) {
-  if (status.ok()) {
-    *dst_hostport = result;
-  }
-  sync->StatusCB(status);
-}
-} // anonymous namespace
-
 Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
   scoped_refptr<ConnectToClusterRpc> rpc;
   Synchronizer sync;
   vector<Sockaddr> addrs;
-  HostPort leader_master_hp;
+  Sockaddr leader_master_addr;
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
 
   for (const scoped_refptr<ExternalMaster>& master : masters_) {
     addrs.push_back(master->bound_rpc_addr());
   }
-  rpc.reset(new ConnectToClusterRpc(Bind(&LeaderMasterCallback,
-                                         &leader_master_hp,
-                                         &sync),
+  const auto& cb = [&](const Status& status,
+                       const Sockaddr& leader_master,
+                       const master::ConnectToMasterResponsePB& resp) {
+    if (status.ok()) {
+      leader_master_addr = leader_master;
+    }
+    sync.StatusCB(status);
+  };
+  rpc.reset(new ConnectToClusterRpc(cb,
                                     std::move(addrs),
                                     deadline,
                                     MonoDelta::FromSeconds(5),
@@ -494,7 +489,7 @@ Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
   RETURN_NOT_OK(sync.Wait());
   bool found = false;
   for (int i = 0; i < masters_.size(); i++) {
-    if (masters_[i]->bound_rpc_hostport().port() == leader_master_hp.port()) {
+    if (masters_[i]->bound_rpc_hostport().port() == leader_master_addr.port()) 
{
       found = true;
       *idx = i;
       break;

Reply via email to